```
├── .github/
├── FUNDING.yml
├── ISSUE_TEMPLATE/
├── bug_report.md
├── feature_request.md
├── workflows/
├── codeql-analyze.yaml
├── docker.yaml
├── qodana_code_quality.yml
├── .gitignore
├── LICENSE
├── README.md
├── SECURITY.md
├── docker/
├── Dockerfile
├── README.md
├── compose.yaml
├── entrypoint.sh
├── package-lock.json
├── package.json
├── qodana.yaml
├── src/
├── Utilities.py
├── certbot.ini
├── dashboard.py
├── gunicorn.conf.py
├── modules/
├── DashboardLogger.py
├── Email.py
├── Log.py
├── PeerJob.py
├── PeerJobLogger.py
```
## /.github/FUNDING.yml
```yml path="/.github/FUNDING.yml"
# These are supported funding model platforms
github: [donaldzou]
patreon: DonaldDonnyZou
```
## /.github/ISSUE_TEMPLATE/bug_report.md
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: bug
assignees: ''
---
**Describe The Problem**
A clear and concise description of what the bug is.
**Expected Error / Traceback**
```
Please provide the error traceback here
```
**To Reproduce**
Please provide how you run the dashboard
**OS Information:**
- OS: [e.g. Ubuntu 18.02]
- Python Version: [e.g v3.7]
**Sample of your `.conf` file**
```
Please provide a sample of your configuration file that you are having problem with. You can replace your public key and private key to ABCD...
```
## /.github/ISSUE_TEMPLATE/feature_request.md
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: enhancement
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
## /.github/workflows/codeql-analyze.yaml
```yaml path="/.github/workflows/codeql-analyze.yaml"
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '30 5 * * 4'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'javascript', 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://git.io/codeql-language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v3
# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language
#- run: |
# make bootstrap
# make release
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
```
## /.github/workflows/docker.yaml
```yaml path="/.github/workflows/docker.yaml"
name: Docker Scan and Build
on:
push:
branches: [ main ]
schedule:
- cron: "0 0 * * *" # Daily at midnight UTC
workflow_dispatch:
inputs:
trigger-build:
description: 'Trigger a manual build and push'
default: 'true'
env:
DOCKER_IMAGE: donaldzou/wgdashboard
jobs:
docker_build_analyze:
runs-on: ubuntu-latest
strategy:
fail-fast: false
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_PASSWORD }}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
with:
platforms: linux/amd64,linux/arm64,linux/arm/v6,linux/arm/v7
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and export (multi-arch)
uses: docker/build-push-action@v6
with:
context: .
file: ./docker/Dockerfile
push: true
tags: ${{ env.DOCKER_IMAGE }}:latest
platforms: linux/amd64,linux/arm64,linux/arm/v7 #ARM v6 no longer support by go image.
- name: Docker Scout
id: docker-scout
uses: docker/scout-action@v1
with:
command: cves
image: ${{ env.DOCKER_IMAGE }}:latest
only-severities: critical,high,medium,low,unspecified
github-token: ${{ secrets.GITHUB_TOKEN }}
```
## /.github/workflows/qodana_code_quality.yml
```yml path="/.github/workflows/qodana_code_quality.yml"
name: Qodana
on:
workflow_dispatch:
pull_request:
push:
branches: # Specify your branches here
- main # The 'main' branch
- v4.2-dev
jobs:
qodana:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
checks: write
steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }} # to check out the actual pull request commit, not the merge commit
fetch-depth: 0 # a full history is required for pull request analysis
- name: 'Qodana Scan'
uses: JetBrains/qodana-action@v2024.3
with:
pr-mode: false
env:
QODANA_TOKEN: ${{ secrets.QODANA_TOKEN_2090978292 }}
QODANA_ENDPOINT: 'https://qodana.cloud'
```
## /.gitignore
```gitignore path="/.gitignore"
.vscode
.DS_Store
.idea
src/db
__pycache__
src/test.py
*.db
src/wg-dashboard.ini
src/static/pic.xd
*.conf
private_key.txt
public_key.txt
venv/**
log/**
release/*
src/db/wgdashboard.db
.jshintrc
node_modules/**
*/proxy.js
src/static/app/proxy.js
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
lerna-debug.log*
node_modules
.DS_Store
dist-ssr
coverage
*.local
/cypress/videos/
/cypress/screenshots/
# Editor directories and files
.vscode/*
!.vscode/extensions.json
.idea
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?
*.tsbuildinfo
.vite/*
```
## /LICENSE
``` path="/LICENSE"
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```
## /README.md
> [!TIP]
> 🎉 I'm excited to announce that WGDashboard is officially listed on DigitalOcean's Marketplace! For more information, please visit [Host WGDashboard & WireGuard with DigitalOcean](https://donaldzou.dev/WGDashboard-Documentation/host-wgdashboard-wireguard-with-digitalocean.html) for more information!
> [!NOTE]
> **Help Wanted 🎉**: Localizing WGDashboard to other languages! If you're willing to help, please visit https://github.com/donaldzou/WGDashboard/issues/397. Many thanks!

WGDashboard
This project is supported by
Monitoring WireGuard is not convenient, in most case, you'll need to login to your server and type wg show
. That's why this project is being created, to view and manage all WireGuard configurations in a easy way.
With all these awesome features, while keeping it easy to install and use
This project is not affiliate to the official WireGuard Project
Join our Discord Server for quick help, or you wanna chat about this project!
Alternatively, you can also reach out at our Matrix.org Chatroom :)
Matrix.org Chatroom
# [Demo](https://wgd-demo.donaldzou.dev)
If you would like to try out WGDashboard, feel free to access the link above. You won't be able to actually connect it with WireGuard since I blocked all Post/Pre script.
Username: `admin`
Password: `admin`
> **Please don't abuse server 🥺, and do not put any personal information on it.** If you can't access it, [please let me know here](https://github.com/donaldzou/WGDashboard/issues/695).
# [Official Documentation](https://donaldzou.dev/WGDashboard-Documentations)
- [💡 Features](https://donaldzou.github.io/WGDashboard-Documentation/features.html)
- [📝 Requirements](https://donaldzou.github.io/WGDashboard-Documentation/requirements.html)
- [🛠 Install](https://donaldzou.github.io/WGDashboard-Documentation/install.html)
- [🐬 Docker Solutions](https://github.com/donaldzou/WGDashboard/tree/main/docker)
- [🪜 Usage](https://donaldzou.github.io/WGDashboard-Documentation/usage.html)
- [📖 API Documentation](https://donaldzou.github.io/WGDashboard-Documentation/api-documentation.html)
- [And much more...](https://donaldzou.github.io/WGDashboard-Documentation/)
# Screenshots
## /SECURITY.md
# Security Policy
## Supported Versions
| Version | Supported |
| ------- | ------------------ |
| 5.1.x | :white_check_mark: |
| 5.0.x | :x: |
| 4.0.x | :white_check_mark: |
| < 4.0 | :x: |
## /docker/Dockerfile
``` path="/docker/Dockerfile"
FROM golang:1.24 AS compiler
WORKDIR /go
RUN apt-get update && apt-get install -y --no-install-recommends \
git make bash build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN git clone --depth=1 https://github.com/amnezia-vpn/amneziawg-tools.git \
&& git clone --depth=1 https://github.com/amnezia-vpn/amneziawg-go.git
RUN cd /go/amneziawg-tools/src \
&& make
RUN cd /go/amneziawg-go && \
go get -u ./... && \
go mod tidy && \
make && \
chmod +x /go/amneziawg-go/amneziawg-go /go/amneziawg-tools/src/wg /go/amneziawg-tools/src/wg-quick/linux.bash
RUN echo "DONE AmneziaWG"
### INTERMEDIATE STAGE
FROM scratch AS bins
COPY --from=compiler /go/amneziawg-go/amneziawg-go /amneziawg-go
COPY --from=compiler /go/amneziawg-tools/src/wg /awg
COPY --from=compiler /go/amneziawg-tools/src/wg-quick/linux.bash /awg-quick
# FINAL STAGE
FROM alpine:latest
LABEL maintainer="dselen@nerthus.nl"
COPY --from=bins /amneziawg-go /usr/bin/amneziawg-go
COPY --from=bins /awg /usr/bin/awg
COPY --from=bins /awg-quick /usr/bin/awg-quick
# Declaring environment variables, change Peernet to an address you like, standard is a 24 bit subnet.
ARG wg_net="10.0.0.1" \
wg_port="51820"
# Following ENV variables are changable on container runtime because /entrypoint.sh handles that. See compose.yaml for more info.
ENV TZ="Europe/Amsterdam" \
global_dns="9.9.9.9" \
wgd_port="10086" \
public_ip=""
# Doing package management operations, such as upgrading
RUN apk update \
&& apk add --no-cache bash git tzdata \
iptables ip6tables openrc curl wireguard-tools \
sudo py3-psutil py3-bcrypt \
&& apk upgrade
# Using WGDASH -- like wg_net functionally as a ARG command. But it is needed in entrypoint.sh so it needs to be exported as environment variable.
ENV WGDASH=/opt/wgdashboard
# Removing the Linux Image package to preserve space on the image, for this reason also deleting apt lists, to be able to install packages: run apt update.
# Doing WireGuard Dashboard installation measures. Modify the git clone command to get the preferred version, with a specific branch for example.
RUN mkdir /data \
&& mkdir /configs \
&& mkdir -p ${WGDASH}/src \
&& mkdir -p /etc/amnezia/amneziawg
COPY ./src ${WGDASH}/src
# Generate basic WireGuard interface. Echoing the WireGuard interface config for readability, adjust if you want it for efficiency.
# Also setting the pipefail option, verbose: https://github.com/hadolint/hadolint/wiki/DL4006.
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
RUN out_adapt=$(ip -o -4 route show to default | awk '{print $NF}') \
&& echo -e "[Interface]\n\
Address = ${wg_net}/24\n\
PrivateKey =\n\
PostUp = iptables -t nat -I POSTROUTING 1 -s ${wg_net}/24 -o ${out_adapt} -j MASQUERADE\n\
PostUp = iptables -I FORWARD -i wg0 -o wg0 -j DROP\n\
PreDown = iptables -t nat -D POSTROUTING -s ${wg_net}/24 -o ${out_adapt} -j MASQUERADE\n\
PreDown = iptables -D FORWARD -i wg0 -o wg0 -j DROP\n\
ListenPort = ${wg_port}\n\
SaveConfig = true\n\
DNS = ${global_dns}" > /configs/wg0.conf.template \
&& chmod 600 /configs/wg0.conf.template
# Defining a way for Docker to check the health of the container. In this case: checking the gunicorn process.
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD sh -c 'pgrep gunicorn > /dev/null && pgrep tail > /dev/null' || exit 1
# Copy the basic entrypoint.sh script.
COPY ./docker/entrypoint.sh /entrypoint.sh
# Exposing the default WireGuard Dashboard port for web access.
EXPOSE 10086
WORKDIR $WGDASH
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
```
## /docker/README.md
# WGDashboard Docker Explanation:
Author: @DaanSelen
This document delves into how the WGDashboard Docker container has been built.
Of course there are two stages (simply said), one before run-time and one at/after run-time.
The `Dockerfile` describes how the container image is made, and the `entrypoint.sh` is executed after running the container.
In this example, WireGuard is integrated into the container itself, so it should be a run-and-go(/out-of-the-box).
For more details on the source-code specific to this Docker image, refer to the source files, they have lots of comments.
To get the container running you either pull the image from the repository, (docker.io)`donaldzou/wgdashboard:latest`.
From there either use the environment variables describe below as parameters or use the Docker Compose file: `compose.yaml`.
Be careful, the default generated WireGuard configuration file uses port 51820/udp. So use this port if you want to use it out of the box.
Otherwise edit the configuration file in `/etc/wireguard/wg0.conf`.
# WGDashboard: 🐳 Docker Deployment Guide
To run the container, you can either pull the image from Docker Hub or build it yourself. The image is available at:
```
docker.io/donaldzou/wgdashboard:latest
```
> `docker.io` is in most cases automatically resolved by the Docker application.
### 🔧 Quick Docker Run Command
Here's an example to get it up and running quickly:
```bash
docker run -d \
--name wgdashboard \
--restart unless-stopped \
-p 10086:10086/tcp \
-p 51820:51820/udp \
--cap-add NET_ADMIN \
donaldzou/wgdashboard:latest
```
> ⚠️ The default WireGuard port is `51820/udp`. If you change this, update the `/etc/wireguard/wg0.conf` accordingly.
---
### 📦 Docker Compose Alternative
You can also use Docker Compose for easier configuration:
```yaml
services:
wgdashboard:
image: donaldzou/wgdashboard:latest
restart: unless-stopped
container_name: wgdashboard
environment:
# - tz=Europe/Amsterdam
# - global_dns=1.1.1.1
# - public_ip=YOUR_PUBLIC_IP
ports:
- 10086:10086/tcp
- 51820:51820/udp
volumes:
- conf:/etc/wireguard
- data:/data
cap_add:
- NET_ADMIN
volumes:
conf:
data:
```
> 📁 You can customize the **volume paths** on the host to fit your needs. The example above uses Docker volumes.
---
## 🔄 Updating the Container
Updating WGDashboard is currently in **alpha** stage. While the update process may work, it's still under testing.
---
## ⚙️ Environment Variables
| Variable | Accepted Values | Default | Example | Description |
|---------------|------------------------------------------|-------------------------|------------------------|-----------------------------------------------------------------------------|
| `tz` | Timezone | `Europe/Amsterdam` | `America/New_York` | Sets the container's timezone. Useful for accurate logs and scheduling. |
| `global_dns` | IPv4 and IPv6 addresses | `9.9.9.9` | `8.8.8.8`, `1.1.1.1` | Default DNS for WireGuard clients. |
| `public_ip` | Public IP address | Retrieved automatically | `253.162.134.73` | Used to generate accurate client configs. Needed if container is NAT’d. |
| `wgd_port` | Any port that is allowed for the process | `10086` | `443` | This port is used to set the WGDashboard web port. |
---
## 🔐 Port Forwarding Note
When using multiple WireGuard interfaces, remember to **open their respective ports** on the host.
Examples:
```yaml
# Individual mapping
- 51821:51821/udp
# Or port range
- 51820-51830:51820-51830/udp
```
> 🚨 **Security Tip:** Only expose ports you actually use.
---
## 🛠️ Building the Image Yourself
To build from source:
```bash
git clone https://github.com/donaldzou/WGDashboard.git
cd WGDashboard
docker build . -f docker/Dockerfile -t yourname/wgdashboard:latest
```
Example output:
```shell
docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
yourname/wgdashboard latest c96fd96ee3b3 42 minutes ago 314MB
```
---
## 🧱 Dockerfile Overview
Here's a brief overview of the Dockerfile stages used in the image build:
### 1. **Build Tools & Go Compilation**
```Dockerfile
FROM golang:1.24 AS compiler
WORKDIR /go
RUN apt-get update && apt-get install -y ...
RUN git clone ... && make
...
```
### 2. **Binary Copy to Scratch**
```Dockerfile
FROM scratch AS bins
COPY --from=compiler /go/amneziawg-go/amneziawg-go /amneziawg-go
...
```
### 3. **Final Alpine Container Setup**
```Dockerfile
FROM alpine:latest
COPY --from=bins ...
RUN apk update && apk add --no-cache ...
COPY ./src ${WGDASH}/src
COPY ./docker/entrypoint.sh /entrypoint.sh
...
EXPOSE 10086
ENTRYPOINT ["/bin/bash", "/entrypoint.sh"]
```
---
## 🚀 Entrypoint Overview
### Major Functions:
- **`ensure_installation`**: Sets up the app, database, and Python environment.
- **`set_envvars`**: Writes `wg-dashboard.ini` and applies environment variables.
- **`start_core`**: Starts the main WGDashboard service.
- **`ensure_blocking`**: Tails the error log to keep the container process alive.
---
## ✅ Final Notes
- Use `docker logs wgdashboard` for troubleshooting.
- Access the web interface via `http://your-ip:10086` (or whichever port you specified in the compose).
- The first time run will auto-generate WireGuard keys and configs (configs are generated from the template).
## Closing remarks:
For feedback please submit an issue to the repository. Or message dselen@nerthus.nl.
## /docker/compose.yaml
```yaml path="/docker/compose.yaml"
services:
wireguard-dashboard:
image: donaldzou/wgdashboard:latest
restart: unless-stopped
container_name: wgdashboard
#environment:
#- tz= # <--- Set container timezone, default: Europe/Amsterdam.
#- public_ip= # <--- Set public IP to ensure the correct one is chosen, defaulting to the IP give by ifconfig.me.
#- wgd_port= # <--- Set the port WGDashboard will use for its web-server.
ports:
- 10086:10086/tcp
- 51820:51820/udp
volumes:
- conf:/etc/wireguard
- data:/data
cap_add:
- NET_ADMIN
volumes:
conf:
data:
```
## /docker/entrypoint.sh
```sh path="/docker/entrypoint.sh"
#!/bin/bash
# Path to the configuration file (exists because of previous function).
config_file="/data/wg-dashboard.ini"
trap 'stop_service' SIGTERM
stop_service() {
echo "[WGDashboard] Stopping WGDashboard..."
bash ./wgd.sh stop
exit 0
}
echo "------------------------- START ----------------------------"
echo "Starting the WireGuard Dashboard Docker container."
ensure_installation() {
# When using a custom directory to store the files, this part moves over and makes sure the installation continues.
echo "Quick-installing..."
chmod +x "${WGDASH}"/src/wgd.sh
cd "${WGDASH}"/src || exit
echo "Removing clear command from wgd.sh for better Docker logging."
sed -i '/clear/d' ./wgd.sh
if [ ! -d "/data/db" ]; then
echo "Creating database dir"
mkdir /data/db
fi
if [ ! -d "${WGDASH}/src/db" ]; then
ln -s /data/db "${WGDASH}/src/db"
fi
if [ ! -f "${config_file}" ]; then
echo "Creating wg-dashboard.ini file"
touch "${config_file}"
fi
if [ ! -f "${WGDASH}/src/wg-dashboard.ini" ]; then
ln -s "${config_file}" "${WGDASH}/src/wg-dashboard.ini"
fi
python3 -m venv "${WGDASH}"/src/venv
. "${WGDASH}/src/venv/bin/activate"
echo "Moving PIP dependency from ephemerality to runtime environment: psutil"
mv /usr/lib/python3.12/site-packages/psutil* "${WGDASH}"/src/venv/lib/python3.12/site-packages
echo "Moving PIP dependency from ephemerality to runtime environment: bcrypt"
mv /usr/lib/python3.12/site-packages/bcrypt* "${WGDASH}"/src/venv/lib/python3.12/site-packages
./wgd.sh install
echo "Looks like the installation succeeded. Moving on."
# This first step is to ensure the wg0.conf file exists, and if not, then its copied over from the ephemeral container storage.
# This is done so WGDashboard it works out of the box
if [ ! -f "/etc/wireguard/wg0.conf" ]; then
echo "Standard wg0 Configuration file not found, grabbing template."
cp -a "/configs/wg0.conf.template" "/etc/wireguard/wg0.conf"
echo "Setting a secure private key." # SORRY 4 BE4 - Daan
local privateKey
privateKey=$(wg genkey)
sed -i "s|^PrivateKey *=.*$|PrivateKey = ${privateKey}|g" /etc/wireguard/wg0.conf
echo "Done setting template."
else
echo "Existing wg0 configuration file found, using that."
fi
}
set_envvars() {
printf "\n------------- SETTING ENVIRONMENT VARIABLES ----------------\n"
# Check if the file is empty
if [ ! -s "${config_file}" ]; then
echo "Config file is empty. Creating [Peers] section."
# Create [Peers] section with initial values
{
echo "[Peers]"
echo "peer_global_dns = ${global_dns}"
echo "remote_endpoint = ${public_ip}"
echo -e "\n[Server]"
echo "app_port = ${wgd_port}"
} > "${config_file}"
else
echo "Config file is not empty, using pre-existing."
fi
echo "Verifying current variables..."
# Check and update the DNS if it has changed
current_dns=$(grep "peer_global_dns = " "${config_file}" | awk '{print $NF}')
if [ "${global_dns}" == "$current_dns" ]; then
echo "DNS is set correctly, moving on."
else
echo "Changing default DNS..."
sed -i "s/^peer_global_dns = .*/peer_global_dns = ${global_dns}/" "${config_file}"
fi
current_public_ip=$(grep "remote_endpoint = " "${config_file}" | awk '{print $NF}')
if [ "${public_ip}" == "" ]; then
default_ip=$(curl -s ifconfig.me)
echo "Trying to fetch the Public-IP using ifconfig.me: ${default_ip}"
sed -i "s/^remote_endpoint = .*/remote_endpoint = ${default_ip}/" "${config_file}"
elif [ "${current_public_ip}" != "${public_ip}" ]; then
sed -i "s/^remote_endpoint = .*/remote_endpoint = ${public_ip}/" "${config_file}"
else
echo "Public-IP is correct, moving on."
fi
current_wgd_port=$(grep "app_port = " "${config_file}" | awk '{print $NF}')
if [ "${current_wgd_port}" == "${wgd_port}" ]; then
echo "Current WGD port is set correctly, moving on."
else
echo "Changing default WGD port..."
sed -i "s/^app_port = .*/app_port = ${wgd_port}/" "${config_file}"
fi
}
# === CORE SERVICES ===
start_core() {
printf "\n---------------------- STARTING CORE -----------------------\n"
echo "Activating Python venv and executing the WireGuard Dashboard service."
bash ./wgd.sh start
}
ensure_blocking() {
sleep 1s
echo -e "\nEnsuring container continuation."
# Find and tail the latest error and access logs if they exist
local logdir="${WGDASH}/src/log"
latestErrLog=$(find "$logdir" -name "error_*.log" -type f -print | sort -r | head -n 1)
# Only tail the logs if they are found
if [ -n "$latestErrLog" ]; then
tail -f "$latestErrLog" &
wait $!
else
echo "No log files found to tail. Something went wrong, exiting..."
fi
}
# Execute functions for the WireGuard Dashboard services, then set the environment variables
ensure_installation
set_envvars
start_core
ensure_blocking
```
## /package-lock.json
```json path="/package-lock.json"
{
"name": "Wireguard-Dashboard",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"dependencies": {
"marked": "^15.0.7",
"openai": "^4.89.0",
"pinia-plugin-persistedstate": "^4.2.0"
}
},
"node_modules/@jridgewell/sourcemap-codec": {
"version": "1.5.0",
"resolved": "https://registry.npmmirror.com/@jridgewell/sourcemap-codec/-/sourcemap-codec-1.5.0.tgz",
"integrity": "sha512-gv3ZRaISU3fjPAgNsriBRqGWQL6quFx04YMPW/zD8XMLsU32mhCCbfbO6KZFLjvYpCZ8zyDEgqsgf+PwPaM7GQ==",
"license": "MIT"
},
"node_modules/@nodelib/fs.scandir": {
"version": "2.1.5",
"resolved": "https://registry.npmmirror.com/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
"integrity": "sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==",
"license": "MIT",
"dependencies": {
"@nodelib/fs.stat": "2.0.5",
"run-parallel": "^1.1.9"
},
"engines": {
"node": ">= 8"
}
},
"node_modules/@nodelib/fs.stat": {
"version": "2.0.5",
"resolved": "https://registry.npmmirror.com/@nodelib/fs.stat/-/fs.stat-2.0.5.tgz",
"integrity": "sha512-RkhPPp2zrqDAQA/2jNhnztcPAlv64XdhIp7a7454A5ovI7Bukxgt7MX7udwAu3zg1DcpPU0rz3VV1SeaqvY4+A==",
"license": "MIT",
"engines": {
"node": ">= 8"
}
},
"node_modules/@nodelib/fs.walk": {
"version": "1.2.8",
"resolved": "https://registry.npmmirror.com/@nodelib/fs.walk/-/fs.walk-1.2.8.tgz",
"integrity": "sha512-oGB+UxlgWcgQkgwo8GcEGwemoTFt3FIO9ababBmaGwXIoBKZ+GTy0pP185beGg7Llih/NSHSV2XAs1lnznocSg==",
"license": "MIT",
"dependencies": {
"@nodelib/fs.scandir": "2.1.5",
"fastq": "^1.6.0"
},
"engines": {
"node": ">= 8"
}
},
"node_modules/@nuxt/kit": {
"version": "3.16.1",
"resolved": "https://registry.npmmirror.com/@nuxt/kit/-/kit-3.16.1.tgz",
"integrity": "sha512-Perby8hJGUeCWad5oTVXb/Ibvp18ZCUC5PxHHu+acMDmVfnxSo48yqk7qNd09VkTF3LEzoEjNZpmW2ZWN0ry7A==",
"license": "MIT",
"dependencies": {
"c12": "^3.0.2",
"consola": "^3.4.2",
"defu": "^6.1.4",
"destr": "^2.0.3",
"errx": "^0.1.0",
"exsolve": "^1.0.4",
"globby": "^14.1.0",
"ignore": "^7.0.3",
"jiti": "^2.4.2",
"klona": "^2.0.6",
"knitwork": "^1.2.0",
"mlly": "^1.7.4",
"ohash": "^2.0.11",
"pathe": "^2.0.3",
"pkg-types": "^2.1.0",
"scule": "^1.3.0",
"semver": "^7.7.1",
"std-env": "^3.8.1",
"ufo": "^1.5.4",
"unctx": "^2.4.1",
"unimport": "^4.1.2",
"untyped": "^2.0.0"
},
"engines": {
"node": ">=18.12.0"
}
},
"node_modules/@sindresorhus/merge-streams": {
"version": "2.3.0",
"resolved": "https://registry.npmmirror.com/@sindresorhus/merge-streams/-/merge-streams-2.3.0.tgz",
"integrity": "sha512-LtoMMhxAlorcGhmFYI+LhPgbPZCkgP6ra1YL604EeF6U98pLlQ3iWIGMdWSC+vWmPBWBNgmDBAhnAobLROJmwg==",
"license": "MIT",
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/@types/estree": {
"version": "1.0.7",
"resolved": "https://registry.npmmirror.com/@types/estree/-/estree-1.0.7.tgz",
"integrity": "sha512-w28IoSUCJpidD/TGviZwwMJckNESJZXFu7NBZ5YJ4mEUnNraUn9Pm8HSZm/jDF1pDWYKspWE7oVphigUPRakIQ==",
"license": "MIT"
},
"node_modules/@types/node": {
"version": "18.19.81",
"resolved": "https://registry.npmmirror.com/@types/node/-/node-18.19.81.tgz",
"integrity": "sha512-7KO9oZ2//ivtSsryp0LQUqq79zyGXzwq1WqfywpC9ucjY7YyltMMmxWgtRFRKCxwa7VPxVBVy4kHf5UC1E8Lug==",
"license": "MIT",
"dependencies": {
"undici-types": "~5.26.4"
}
},
"node_modules/@types/node-fetch": {
"version": "2.6.12",
"resolved": "https://registry.npmmirror.com/@types/node-fetch/-/node-fetch-2.6.12.tgz",
"integrity": "sha512-8nneRWKCg3rMtF69nLQJnOYUcbafYeFSjqkw3jCRLsqkWFlHaoQrr5mXmofFGOx3DKn7UfmBMyov8ySvLRVldA==",
"license": "MIT",
"dependencies": {
"@types/node": "*",
"form-data": "^4.0.0"
}
},
"node_modules/abort-controller": {
"version": "3.0.0",
"resolved": "https://registry.npmmirror.com/abort-controller/-/abort-controller-3.0.0.tgz",
"integrity": "sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==",
"license": "MIT",
"dependencies": {
"event-target-shim": "^5.0.0"
},
"engines": {
"node": ">=6.5"
}
},
"node_modules/acorn": {
"version": "8.14.1",
"resolved": "https://registry.npmmirror.com/acorn/-/acorn-8.14.1.tgz",
"integrity": "sha512-OvQ/2pUDKmgfCg++xsTX1wGxfTaszcHVcTctW4UJB4hibJx2HXxxO5UmVgyjMa+ZDsiaf5wWLXYpRWMmBI0QHg==",
"license": "MIT",
"bin": {
"acorn": "bin/acorn"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/agentkeepalive": {
"version": "4.6.0",
"resolved": "https://registry.npmmirror.com/agentkeepalive/-/agentkeepalive-4.6.0.tgz",
"integrity": "sha512-kja8j7PjmncONqaTsB8fQ+wE2mSU2DJ9D4XKoJ5PFWIdRMa6SLSN1ff4mOr4jCbfRSsxR4keIiySJU0N9T5hIQ==",
"license": "MIT",
"dependencies": {
"humanize-ms": "^1.2.1"
},
"engines": {
"node": ">= 8.0.0"
}
},
"node_modules/asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmmirror.com/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==",
"license": "MIT"
},
"node_modules/braces": {
"version": "3.0.3",
"resolved": "https://registry.npmmirror.com/braces/-/braces-3.0.3.tgz",
"integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==",
"license": "MIT",
"dependencies": {
"fill-range": "^7.1.1"
},
"engines": {
"node": ">=8"
}
},
"node_modules/c12": {
"version": "3.0.2",
"resolved": "https://registry.npmmirror.com/c12/-/c12-3.0.2.tgz",
"integrity": "sha512-6Tzk1/TNeI3WBPpK0j/Ss4+gPj3PUJYbWl/MWDJBThFvwNGNkXtd7Cz8BJtD4aRwoGHtzQD0SnxamgUiBH0/Nw==",
"license": "MIT",
"dependencies": {
"chokidar": "^4.0.3",
"confbox": "^0.1.8",
"defu": "^6.1.4",
"dotenv": "^16.4.7",
"exsolve": "^1.0.0",
"giget": "^2.0.0",
"jiti": "^2.4.2",
"ohash": "^2.0.5",
"pathe": "^2.0.3",
"perfect-debounce": "^1.0.0",
"pkg-types": "^2.0.0",
"rc9": "^2.1.2"
},
"peerDependencies": {
"magicast": "^0.3.5"
},
"peerDependenciesMeta": {
"magicast": {
"optional": true
}
}
},
"node_modules/call-bind-apply-helpers": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/call-bind-apply-helpers/-/call-bind-apply-helpers-1.0.2.tgz",
"integrity": "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
"function-bind": "^1.1.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/chokidar": {
"version": "4.0.3",
"resolved": "https://registry.npmmirror.com/chokidar/-/chokidar-4.0.3.tgz",
"integrity": "sha512-Qgzu8kfBvo+cA4962jnP1KkS6Dop5NS6g7R5LFYJr4b8Ub94PPQXUksCw9PvXoeXPRRddRNC5C1JQUR2SMGtnA==",
"license": "MIT",
"dependencies": {
"readdirp": "^4.0.1"
},
"engines": {
"node": ">= 14.16.0"
},
"funding": {
"url": "https://paulmillr.com/funding/"
}
},
"node_modules/citty": {
"version": "0.1.6",
"resolved": "https://registry.npmmirror.com/citty/-/citty-0.1.6.tgz",
"integrity": "sha512-tskPPKEs8D2KPafUypv2gxwJP8h/OaJmC82QQGGDQcHvXX43xF2VDACcJVmZ0EuSxkpO9Kc4MlrA3q0+FG58AQ==",
"license": "MIT",
"dependencies": {
"consola": "^3.2.3"
}
},
"node_modules/combined-stream": {
"version": "1.0.8",
"resolved": "https://registry.npmmirror.com/combined-stream/-/combined-stream-1.0.8.tgz",
"integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==",
"license": "MIT",
"dependencies": {
"delayed-stream": "~1.0.0"
},
"engines": {
"node": ">= 0.8"
}
},
"node_modules/confbox": {
"version": "0.1.8",
"resolved": "https://registry.npmmirror.com/confbox/-/confbox-0.1.8.tgz",
"integrity": "sha512-RMtmw0iFkeR4YV+fUOSucriAQNb9g8zFR52MWCtl+cCZOFRNL6zeB395vPzFhEjjn4fMxXudmELnl/KF/WrK6w==",
"license": "MIT"
},
"node_modules/consola": {
"version": "3.4.2",
"resolved": "https://registry.npmmirror.com/consola/-/consola-3.4.2.tgz",
"integrity": "sha512-5IKcdX0nnYavi6G7TtOhwkYzyjfJlatbjMjuLSfE2kYT5pMDOilZ4OvMhi637CcDICTmz3wARPoyhqyX1Y+XvA==",
"license": "MIT",
"engines": {
"node": "^14.18.0 || >=16.10.0"
}
},
"node_modules/deep-pick-omit": {
"version": "1.2.1",
"resolved": "https://registry.npmmirror.com/deep-pick-omit/-/deep-pick-omit-1.2.1.tgz",
"integrity": "sha512-2J6Kc/m3irCeqVG42T+SaUMesaK7oGWaedGnQQK/+O0gYc+2SP5bKh/KKTE7d7SJ+GCA9UUE1GRzh6oDe0EnGw==",
"license": "MIT"
},
"node_modules/defu": {
"version": "6.1.4",
"resolved": "https://registry.npmmirror.com/defu/-/defu-6.1.4.tgz",
"integrity": "sha512-mEQCMmwJu317oSz8CwdIOdwf3xMif1ttiM8LTufzc3g6kR+9Pe236twL8j3IYT1F7GfRgGcW6MWxzZjLIkuHIg==",
"license": "MIT"
},
"node_modules/delayed-stream": {
"version": "1.0.0",
"resolved": "https://registry.npmmirror.com/delayed-stream/-/delayed-stream-1.0.0.tgz",
"integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==",
"license": "MIT",
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/destr": {
"version": "2.0.3",
"resolved": "https://registry.npmmirror.com/destr/-/destr-2.0.3.tgz",
"integrity": "sha512-2N3BOUU4gYMpTP24s5rF5iP7BDr7uNTCs4ozw3kf/eKfvWSIu93GEBi5m427YoyJoeOzQ5smuu4nNAPGb8idSQ==",
"license": "MIT"
},
"node_modules/dotenv": {
"version": "16.4.7",
"resolved": "https://registry.npmmirror.com/dotenv/-/dotenv-16.4.7.tgz",
"integrity": "sha512-47qPchRCykZC03FhkYAhrvwU4xDBFIj1QPqaarj6mdM/hgUzfPHcpkHJOn3mJAufFeeAxAzeGsr5X0M4k6fLZQ==",
"license": "BSD-2-Clause",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://dotenvx.com"
}
},
"node_modules/dunder-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/dunder-proto/-/dunder-proto-1.0.1.tgz",
"integrity": "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A==",
"license": "MIT",
"dependencies": {
"call-bind-apply-helpers": "^1.0.1",
"es-errors": "^1.3.0",
"gopd": "^1.2.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/errx": {
"version": "0.1.0",
"resolved": "https://registry.npmmirror.com/errx/-/errx-0.1.0.tgz",
"integrity": "sha512-fZmsRiDNv07K6s2KkKFTiD2aIvECa7++PKyD5NC32tpRw46qZA3sOz+aM+/V9V0GDHxVTKLziveV4JhzBHDp9Q==",
"license": "MIT"
},
"node_modules/es-define-property": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/es-define-property/-/es-define-property-1.0.1.tgz",
"integrity": "sha512-e3nRfgfUZ4rNGL232gUgX06QNyyez04KdjFrF+LTRoOXmrOgFKDg4BCdsjW8EnT69eqdYGmRpJwiPVYNrCaW3g==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-errors": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/es-errors/-/es-errors-1.3.0.tgz",
"integrity": "sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-object-atoms": {
"version": "1.1.1",
"resolved": "https://registry.npmmirror.com/es-object-atoms/-/es-object-atoms-1.1.1.tgz",
"integrity": "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-set-tostringtag": {
"version": "2.1.0",
"resolved": "https://registry.npmmirror.com/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz",
"integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
"get-intrinsic": "^1.2.6",
"has-tostringtag": "^1.0.2",
"hasown": "^2.0.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/escape-string-regexp": {
"version": "5.0.0",
"resolved": "https://registry.npmmirror.com/escape-string-regexp/-/escape-string-regexp-5.0.0.tgz",
"integrity": "sha512-/veY75JbMK4j1yjvuUxuVsiS/hr/4iHs9FTT6cgTexxdE0Ly/glccBAkloH/DofkjRbZU3bnoj38mOmhkZ0lHw==",
"license": "MIT",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/estree-walker": {
"version": "3.0.3",
"resolved": "https://registry.npmmirror.com/estree-walker/-/estree-walker-3.0.3.tgz",
"integrity": "sha512-7RUKfXgSMMkzt6ZuXmqapOurLGPPfgj6l9uRZ7lRGolvk0y2yocc35LdcxKC5PQZdn2DMqioAQ2NoWcrTKmm6g==",
"license": "MIT",
"dependencies": {
"@types/estree": "^1.0.0"
}
},
"node_modules/event-target-shim": {
"version": "5.0.1",
"resolved": "https://registry.npmmirror.com/event-target-shim/-/event-target-shim-5.0.1.tgz",
"integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==",
"license": "MIT",
"engines": {
"node": ">=6"
}
},
"node_modules/exsolve": {
"version": "1.0.4",
"resolved": "https://registry.npmmirror.com/exsolve/-/exsolve-1.0.4.tgz",
"integrity": "sha512-xsZH6PXaER4XoV+NiT7JHp1bJodJVT+cxeSH1G0f0tlT0lJqYuHUP3bUx2HtfTDvOagMINYp8rsqusxud3RXhw==",
"license": "MIT"
},
"node_modules/fast-glob": {
"version": "3.3.3",
"resolved": "https://registry.npmmirror.com/fast-glob/-/fast-glob-3.3.3.tgz",
"integrity": "sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==",
"license": "MIT",
"dependencies": {
"@nodelib/fs.stat": "^2.0.2",
"@nodelib/fs.walk": "^1.2.3",
"glob-parent": "^5.1.2",
"merge2": "^1.3.0",
"micromatch": "^4.0.8"
},
"engines": {
"node": ">=8.6.0"
}
},
"node_modules/fastq": {
"version": "1.19.1",
"resolved": "https://registry.npmmirror.com/fastq/-/fastq-1.19.1.tgz",
"integrity": "sha512-GwLTyxkCXjXbxqIhTsMI2Nui8huMPtnxg7krajPJAjnEG/iiOS7i+zCtWGZR9G0NBKbXKh6X9m9UIsYX/N6vvQ==",
"license": "ISC",
"dependencies": {
"reusify": "^1.0.4"
}
},
"node_modules/fill-range": {
"version": "7.1.1",
"resolved": "https://registry.npmmirror.com/fill-range/-/fill-range-7.1.1.tgz",
"integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==",
"license": "MIT",
"dependencies": {
"to-regex-range": "^5.0.1"
},
"engines": {
"node": ">=8"
}
},
"node_modules/form-data": {
"version": "4.0.2",
"resolved": "https://registry.npmmirror.com/form-data/-/form-data-4.0.2.tgz",
"integrity": "sha512-hGfm/slu0ZabnNt4oaRZ6uREyfCj6P4fT/n6A1rGV+Z0VdGXjfOhVUpkn6qVQONHGIFwmveGXyDs75+nr6FM8w==",
"license": "MIT",
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"es-set-tostringtag": "^2.1.0",
"mime-types": "^2.1.12"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/form-data-encoder": {
"version": "1.7.2",
"resolved": "https://registry.npmmirror.com/form-data-encoder/-/form-data-encoder-1.7.2.tgz",
"integrity": "sha512-qfqtYan3rxrnCk1VYaA4H+Ms9xdpPqvLZa6xmMgFvhO32x7/3J/ExcTd6qpxM0vH2GdMI+poehyBZvqfMTto8A==",
"license": "MIT"
},
"node_modules/formdata-node": {
"version": "4.4.1",
"resolved": "https://registry.npmmirror.com/formdata-node/-/formdata-node-4.4.1.tgz",
"integrity": "sha512-0iirZp3uVDjVGt9p49aTaqjk84TrglENEDuqfdlZQ1roC9CWlPk6Avf8EEnZNcAqPonwkG35x4n3ww/1THYAeQ==",
"license": "MIT",
"dependencies": {
"node-domexception": "1.0.0",
"web-streams-polyfill": "4.0.0-beta.3"
},
"engines": {
"node": ">= 12.20"
}
},
"node_modules/function-bind": {
"version": "1.1.2",
"resolved": "https://registry.npmmirror.com/function-bind/-/function-bind-1.1.2.tgz",
"integrity": "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/get-intrinsic": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/get-intrinsic/-/get-intrinsic-1.3.0.tgz",
"integrity": "sha512-9fSjSaos/fRIVIp+xSJlE6lfwhES7LNtKaCBIamHsjr2na1BiABJPo0mOjjz8GJDURarmCPGqaiVg5mfjb98CQ==",
"license": "MIT",
"dependencies": {
"call-bind-apply-helpers": "^1.0.2",
"es-define-property": "^1.0.1",
"es-errors": "^1.3.0",
"es-object-atoms": "^1.1.1",
"function-bind": "^1.1.2",
"get-proto": "^1.0.1",
"gopd": "^1.2.0",
"has-symbols": "^1.1.0",
"hasown": "^2.0.2",
"math-intrinsics": "^1.1.0"
},
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/get-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/get-proto/-/get-proto-1.0.1.tgz",
"integrity": "sha512-sTSfBjoXBp89JvIKIefqw7U2CCebsc74kiY6awiGogKtoSGbgjYE/G/+l9sF3MWFPNc9IcoOC4ODfKHfxFmp0g==",
"license": "MIT",
"dependencies": {
"dunder-proto": "^1.0.1",
"es-object-atoms": "^1.0.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/giget": {
"version": "2.0.0",
"resolved": "https://registry.npmmirror.com/giget/-/giget-2.0.0.tgz",
"integrity": "sha512-L5bGsVkxJbJgdnwyuheIunkGatUF/zssUoxxjACCseZYAVbaqdh9Tsmmlkl8vYan09H7sbvKt4pS8GqKLBrEzA==",
"license": "MIT",
"dependencies": {
"citty": "^0.1.6",
"consola": "^3.4.0",
"defu": "^6.1.4",
"node-fetch-native": "^1.6.6",
"nypm": "^0.6.0",
"pathe": "^2.0.3"
},
"bin": {
"giget": "dist/cli.mjs"
}
},
"node_modules/glob-parent": {
"version": "5.1.2",
"resolved": "https://registry.npmmirror.com/glob-parent/-/glob-parent-5.1.2.tgz",
"integrity": "sha512-AOIgSQCepiJYwP3ARnGx+5VnTu2HBYdzbGP45eLw1vr3zB3vZLeyed1sC9hnbcOc9/SrMyM5RPQrkGz4aS9Zow==",
"license": "ISC",
"dependencies": {
"is-glob": "^4.0.1"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/globby": {
"version": "14.1.0",
"resolved": "https://registry.npmmirror.com/globby/-/globby-14.1.0.tgz",
"integrity": "sha512-0Ia46fDOaT7k4og1PDW4YbodWWr3scS2vAr2lTbsplOt2WkKp0vQbkI9wKis/T5LV/dqPjO3bpS/z6GTJB82LA==",
"license": "MIT",
"dependencies": {
"@sindresorhus/merge-streams": "^2.1.0",
"fast-glob": "^3.3.3",
"ignore": "^7.0.3",
"path-type": "^6.0.0",
"slash": "^5.1.0",
"unicorn-magic": "^0.3.0"
},
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/gopd": {
"version": "1.2.0",
"resolved": "https://registry.npmmirror.com/gopd/-/gopd-1.2.0.tgz",
"integrity": "sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/has-symbols": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/has-symbols/-/has-symbols-1.1.0.tgz",
"integrity": "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/has-tostringtag": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/has-tostringtag/-/has-tostringtag-1.0.2.tgz",
"integrity": "sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw==",
"license": "MIT",
"dependencies": {
"has-symbols": "^1.0.3"
},
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/hasown": {
"version": "2.0.2",
"resolved": "https://registry.npmmirror.com/hasown/-/hasown-2.0.2.tgz",
"integrity": "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==",
"license": "MIT",
"dependencies": {
"function-bind": "^1.1.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/humanize-ms": {
"version": "1.2.1",
"resolved": "https://registry.npmmirror.com/humanize-ms/-/humanize-ms-1.2.1.tgz",
"integrity": "sha512-Fl70vYtsAFb/C06PTS9dZBo7ihau+Tu/DNCk/OyHhea07S+aeMWpFFkUaXRa8fI+ScZbEI8dfSxwY7gxZ9SAVQ==",
"license": "MIT",
"dependencies": {
"ms": "^2.0.0"
}
},
"node_modules/ignore": {
"version": "7.0.3",
"resolved": "https://registry.npmmirror.com/ignore/-/ignore-7.0.3.tgz",
"integrity": "sha512-bAH5jbK/F3T3Jls4I0SO1hmPR0dKU0a7+SY6n1yzRtG54FLO8d6w/nxLFX2Nb7dBu6cCWXPaAME6cYqFUMmuCA==",
"license": "MIT",
"engines": {
"node": ">= 4"
}
},
"node_modules/is-extglob": {
"version": "2.1.1",
"resolved": "https://registry.npmmirror.com/is-extglob/-/is-extglob-2.1.1.tgz",
"integrity": "sha512-SbKbANkN603Vi4jEZv49LeVJMn4yGwsbzZworEoyEiutsN3nJYdbO36zfhGJ6QEDpOZIFkDtnq5JRxmvl3jsoQ==",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/is-glob": {
"version": "4.0.3",
"resolved": "https://registry.npmmirror.com/is-glob/-/is-glob-4.0.3.tgz",
"integrity": "sha512-xelSayHH36ZgE7ZWhli7pW34hNbNl8Ojv5KVmkJD4hBdD3th8Tfk9vYasLM+mXWOZhFkgZfxhLSnrwRr4elSSg==",
"license": "MIT",
"dependencies": {
"is-extglob": "^2.1.1"
},
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/is-number": {
"version": "7.0.0",
"resolved": "https://registry.npmmirror.com/is-number/-/is-number-7.0.0.tgz",
"integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==",
"license": "MIT",
"engines": {
"node": ">=0.12.0"
}
},
"node_modules/jiti": {
"version": "2.4.2",
"resolved": "https://registry.npmmirror.com/jiti/-/jiti-2.4.2.tgz",
"integrity": "sha512-rg9zJN+G4n2nfJl5MW3BMygZX56zKPNVEYYqq7adpmMh4Jn2QNEwhvQlFy6jPVdcod7txZtKHWnyZiA3a0zP7A==",
"license": "MIT",
"bin": {
"jiti": "lib/jiti-cli.mjs"
}
},
"node_modules/js-tokens": {
"version": "9.0.1",
"resolved": "https://registry.npmmirror.com/js-tokens/-/js-tokens-9.0.1.tgz",
"integrity": "sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ==",
"license": "MIT"
},
"node_modules/klona": {
"version": "2.0.6",
"resolved": "https://registry.npmmirror.com/klona/-/klona-2.0.6.tgz",
"integrity": "sha512-dhG34DXATL5hSxJbIexCft8FChFXtmskoZYnoPWjXQuebWYCNkVeV3KkGegCK9CP1oswI/vQibS2GY7Em/sJJA==",
"license": "MIT",
"engines": {
"node": ">= 8"
}
},
"node_modules/knitwork": {
"version": "1.2.0",
"resolved": "https://registry.npmmirror.com/knitwork/-/knitwork-1.2.0.tgz",
"integrity": "sha512-xYSH7AvuQ6nXkq42x0v5S8/Iry+cfulBz/DJQzhIyESdLD7425jXsPy4vn5cCXU+HhRN2kVw51Vd1K6/By4BQg==",
"license": "MIT"
},
"node_modules/local-pkg": {
"version": "1.1.1",
"resolved": "https://registry.npmmirror.com/local-pkg/-/local-pkg-1.1.1.tgz",
"integrity": "sha512-WunYko2W1NcdfAFpuLUoucsgULmgDBRkdxHxWQ7mK0cQqwPiy8E1enjuRBrhLtZkB5iScJ1XIPdhVEFK8aOLSg==",
"license": "MIT",
"dependencies": {
"mlly": "^1.7.4",
"pkg-types": "^2.0.1",
"quansync": "^0.2.8"
},
"engines": {
"node": ">=14"
},
"funding": {
"url": "https://github.com/sponsors/antfu"
}
},
"node_modules/magic-string": {
"version": "0.30.17",
"resolved": "https://registry.npmmirror.com/magic-string/-/magic-string-0.30.17.tgz",
"integrity": "sha512-sNPKHvyjVf7gyjwS4xGTaW/mCnF8wnjtifKBEhxfZ7E/S8tQ0rssrwGNn6q8JH/ohItJfSQp9mBtQYuTlH5QnA==",
"license": "MIT",
"dependencies": {
"@jridgewell/sourcemap-codec": "^1.5.0"
}
},
"node_modules/marked": {
"version": "15.0.7",
"resolved": "https://registry.npmmirror.com/marked/-/marked-15.0.7.tgz",
"integrity": "sha512-dgLIeKGLx5FwziAnsk4ONoGwHwGPJzselimvlVskE9XLN4Orv9u2VA3GWw/lYUqjfA0rUT/6fqKwfZJapP9BEg==",
"license": "MIT",
"bin": {
"marked": "bin/marked.js"
},
"engines": {
"node": ">= 18"
}
},
"node_modules/math-intrinsics": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/math-intrinsics/-/math-intrinsics-1.1.0.tgz",
"integrity": "sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/merge2": {
"version": "1.4.1",
"resolved": "https://registry.npmmirror.com/merge2/-/merge2-1.4.1.tgz",
"integrity": "sha512-8q7VEgMJW4J8tcfVPy8g09NcQwZdbwFEqhe/WZkoIzjn/3TGDwtOCYtXGxA3O8tPzpczCCDgv+P2P5y00ZJOOg==",
"license": "MIT",
"engines": {
"node": ">= 8"
}
},
"node_modules/micromatch": {
"version": "4.0.8",
"resolved": "https://registry.npmmirror.com/micromatch/-/micromatch-4.0.8.tgz",
"integrity": "sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==",
"license": "MIT",
"dependencies": {
"braces": "^3.0.3",
"picomatch": "^2.3.1"
},
"engines": {
"node": ">=8.6"
}
},
"node_modules/mime-db": {
"version": "1.52.0",
"resolved": "https://registry.npmmirror.com/mime-db/-/mime-db-1.52.0.tgz",
"integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
"license": "MIT",
"engines": {
"node": ">= 0.6"
}
},
"node_modules/mime-types": {
"version": "2.1.35",
"resolved": "https://registry.npmmirror.com/mime-types/-/mime-types-2.1.35.tgz",
"integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
"license": "MIT",
"dependencies": {
"mime-db": "1.52.0"
},
"engines": {
"node": ">= 0.6"
}
},
"node_modules/mlly": {
"version": "1.7.4",
"resolved": "https://registry.npmmirror.com/mlly/-/mlly-1.7.4.tgz",
"integrity": "sha512-qmdSIPC4bDJXgZTCR7XosJiNKySV7O215tsPtDN9iEO/7q/76b/ijtgRu/+epFXSJhijtTCCGp3DWS549P3xKw==",
"license": "MIT",
"dependencies": {
"acorn": "^8.14.0",
"pathe": "^2.0.1",
"pkg-types": "^1.3.0",
"ufo": "^1.5.4"
}
},
"node_modules/mlly/node_modules/pkg-types": {
"version": "1.3.1",
"resolved": "https://registry.npmmirror.com/pkg-types/-/pkg-types-1.3.1.tgz",
"integrity": "sha512-/Jm5M4RvtBFVkKWRu2BLUTNP8/M2a+UwuAX+ae4770q1qVGtfjG+WTCupoZixokjmHiry8uI+dlY8KXYV5HVVQ==",
"license": "MIT",
"dependencies": {
"confbox": "^0.1.8",
"mlly": "^1.7.4",
"pathe": "^2.0.1"
}
},
"node_modules/ms": {
"version": "2.1.3",
"resolved": "https://registry.npmmirror.com/ms/-/ms-2.1.3.tgz",
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==",
"license": "MIT"
},
"node_modules/node-domexception": {
"version": "1.0.0",
"resolved": "https://registry.npmmirror.com/node-domexception/-/node-domexception-1.0.0.tgz",
"integrity": "sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/jimmywarting"
},
{
"type": "github",
"url": "https://paypal.me/jimmywarting"
}
],
"license": "MIT",
"engines": {
"node": ">=10.5.0"
}
},
"node_modules/node-fetch": {
"version": "2.7.0",
"resolved": "https://registry.npmmirror.com/node-fetch/-/node-fetch-2.7.0.tgz",
"integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==",
"license": "MIT",
"dependencies": {
"whatwg-url": "^5.0.0"
},
"engines": {
"node": "4.x || >=6.0.0"
},
"peerDependencies": {
"encoding": "^0.1.0"
},
"peerDependenciesMeta": {
"encoding": {
"optional": true
}
}
},
"node_modules/node-fetch-native": {
"version": "1.6.6",
"resolved": "https://registry.npmmirror.com/node-fetch-native/-/node-fetch-native-1.6.6.tgz",
"integrity": "sha512-8Mc2HhqPdlIfedsuZoc3yioPuzp6b+L5jRCRY1QzuWZh2EGJVQrGppC6V6cF0bLdbW0+O2YpqCA25aF/1lvipQ==",
"license": "MIT"
},
"node_modules/nypm": {
"version": "0.6.0",
"resolved": "https://registry.npmmirror.com/nypm/-/nypm-0.6.0.tgz",
"integrity": "sha512-mn8wBFV9G9+UFHIrq+pZ2r2zL4aPau/by3kJb3cM7+5tQHMt6HGQB8FDIeKFYp8o0D2pnH6nVsO88N4AmUxIWg==",
"license": "MIT",
"dependencies": {
"citty": "^0.1.6",
"consola": "^3.4.0",
"pathe": "^2.0.3",
"pkg-types": "^2.0.0",
"tinyexec": "^0.3.2"
},
"bin": {
"nypm": "dist/cli.mjs"
},
"engines": {
"node": "^14.16.0 || >=16.10.0"
}
},
"node_modules/ohash": {
"version": "2.0.11",
"resolved": "https://registry.npmmirror.com/ohash/-/ohash-2.0.11.tgz",
"integrity": "sha512-RdR9FQrFwNBNXAr4GixM8YaRZRJ5PUWbKYbE5eOsrwAjJW0q2REGcf79oYPsLyskQCZG1PLN+S/K1V00joZAoQ==",
"license": "MIT"
},
"node_modules/openai": {
"version": "4.89.0",
"resolved": "https://registry.npmmirror.com/openai/-/openai-4.89.0.tgz",
"integrity": "sha512-XNI0q2l8/Os6jmojxaID5EhyQjxZgzR2gWcpEjYWK5hGKwE7AcifxEY7UNwFDDHJQXqeiosQ0CJwQN+rvnwdjA==",
"license": "Apache-2.0",
"dependencies": {
"@types/node": "^18.11.18",
"@types/node-fetch": "^2.6.4",
"abort-controller": "^3.0.0",
"agentkeepalive": "^4.2.1",
"form-data-encoder": "1.7.2",
"formdata-node": "^4.3.2",
"node-fetch": "^2.6.7"
},
"bin": {
"openai": "bin/cli"
},
"peerDependencies": {
"ws": "^8.18.0",
"zod": "^3.23.8"
},
"peerDependenciesMeta": {
"ws": {
"optional": true
},
"zod": {
"optional": true
}
}
},
"node_modules/path-type": {
"version": "6.0.0",
"resolved": "https://registry.npmmirror.com/path-type/-/path-type-6.0.0.tgz",
"integrity": "sha512-Vj7sf++t5pBD637NSfkxpHSMfWaeig5+DKWLhcqIYx6mWQz5hdJTGDVMQiJcw1ZYkhs7AazKDGpRVji1LJCZUQ==",
"license": "MIT",
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/pathe": {
"version": "2.0.3",
"resolved": "https://registry.npmmirror.com/pathe/-/pathe-2.0.3.tgz",
"integrity": "sha512-WUjGcAqP1gQacoQe+OBJsFA7Ld4DyXuUIjZ5cc75cLHvJ7dtNsTugphxIADwspS+AraAUePCKrSVtPLFj/F88w==",
"license": "MIT"
},
"node_modules/perfect-debounce": {
"version": "1.0.0",
"resolved": "https://registry.npmmirror.com/perfect-debounce/-/perfect-debounce-1.0.0.tgz",
"integrity": "sha512-xCy9V055GLEqoFaHoC1SoLIaLmWctgCUaBaWxDZ7/Zx4CTyX7cJQLJOok/orfjZAh9kEYpjJa4d0KcJmCbctZA==",
"license": "MIT"
},
"node_modules/picomatch": {
"version": "2.3.1",
"resolved": "https://registry.npmmirror.com/picomatch/-/picomatch-2.3.1.tgz",
"integrity": "sha512-JU3teHTNjmE2VCGFzuY8EXzCDVwEqB2a8fsIvwaStHhAWJEeVd1o1QD80CU6+ZdEXXSLbSsuLwJjkCBWqRQUVA==",
"license": "MIT",
"engines": {
"node": ">=8.6"
},
"funding": {
"url": "https://github.com/sponsors/jonschlinkert"
}
},
"node_modules/pinia-plugin-persistedstate": {
"version": "4.2.0",
"resolved": "https://registry.npmmirror.com/pinia-plugin-persistedstate/-/pinia-plugin-persistedstate-4.2.0.tgz",
"integrity": "sha512-3buhA7ac+ssbOIx3VRCC8oHkoFwhDM9oHRCjo7nj+O8WUqnW+jRqh7eYT5eS/DNa3H28zp3dYf/nd/Vc8zj8eQ==",
"license": "MIT",
"dependencies": {
"@nuxt/kit": "^3.14.1592",
"deep-pick-omit": "^1.2.1",
"defu": "^6.1.4",
"destr": "^2.0.3"
},
"peerDependencies": {
"@pinia/nuxt": ">=0.9.0",
"pinia": ">=2.3.0"
},
"peerDependenciesMeta": {
"@pinia/nuxt": {
"optional": true
},
"pinia": {
"optional": true
}
}
},
"node_modules/pkg-types": {
"version": "2.1.0",
"resolved": "https://registry.npmmirror.com/pkg-types/-/pkg-types-2.1.0.tgz",
"integrity": "sha512-wmJwA+8ihJixSoHKxZJRBQG1oY8Yr9pGLzRmSsNms0iNWyHHAlZCa7mmKiFR10YPZuz/2k169JiS/inOjBCZ2A==",
"license": "MIT",
"dependencies": {
"confbox": "^0.2.1",
"exsolve": "^1.0.1",
"pathe": "^2.0.3"
}
},
"node_modules/pkg-types/node_modules/confbox": {
"version": "0.2.1",
"resolved": "https://registry.npmmirror.com/confbox/-/confbox-0.2.1.tgz",
"integrity": "sha512-hkT3yDPFbs95mNCy1+7qNKC6Pro+/ibzYxtM2iqEigpf0sVw+bg4Zh9/snjsBcf990vfIsg5+1U7VyiyBb3etg==",
"license": "MIT"
},
"node_modules/quansync": {
"version": "0.2.10",
"resolved": "https://registry.npmmirror.com/quansync/-/quansync-0.2.10.tgz",
"integrity": "sha512-t41VRkMYbkHyCYmOvx/6URnN80H7k4X0lLdBMGsz+maAwrJQYB1djpV6vHrQIBE0WBSGqhtEHrK9U3DWWH8v7A==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/antfu"
},
{
"type": "individual",
"url": "https://github.com/sponsors/sxzz"
}
],
"license": "MIT"
},
"node_modules/queue-microtask": {
"version": "1.2.3",
"resolved": "https://registry.npmmirror.com/queue-microtask/-/queue-microtask-1.2.3.tgz",
"integrity": "sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT"
},
"node_modules/rc9": {
"version": "2.1.2",
"resolved": "https://registry.npmmirror.com/rc9/-/rc9-2.1.2.tgz",
"integrity": "sha512-btXCnMmRIBINM2LDZoEmOogIZU7Qe7zn4BpomSKZ/ykbLObuBdvG+mFq11DL6fjH1DRwHhrlgtYWG96bJiC7Cg==",
"license": "MIT",
"dependencies": {
"defu": "^6.1.4",
"destr": "^2.0.3"
}
},
"node_modules/readdirp": {
"version": "4.1.2",
"resolved": "https://registry.npmmirror.com/readdirp/-/readdirp-4.1.2.tgz",
"integrity": "sha512-GDhwkLfywWL2s6vEjyhri+eXmfH6j1L7JE27WhqLeYzoh/A3DBaYGEj2H/HFZCn/kMfim73FXxEJTw06WtxQwg==",
"license": "MIT",
"engines": {
"node": ">= 14.18.0"
},
"funding": {
"type": "individual",
"url": "https://paulmillr.com/funding/"
}
},
"node_modules/reusify": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/reusify/-/reusify-1.1.0.tgz",
"integrity": "sha512-g6QUff04oZpHs0eG5p83rFLhHeV00ug/Yf9nZM6fLeUrPguBTkTQOdpAWWspMh55TZfVQDPaN3NQJfbVRAxdIw==",
"license": "MIT",
"engines": {
"iojs": ">=1.0.0",
"node": ">=0.10.0"
}
},
"node_modules/run-parallel": {
"version": "1.2.0",
"resolved": "https://registry.npmmirror.com/run-parallel/-/run-parallel-1.2.0.tgz",
"integrity": "sha512-5l4VyZR86LZ/lDxZTR6jqL8AFE2S0IFLMP26AbjsLVADxHdhB/c0GUsH+y39UfCi3dzz8OlQuPmnaJOMoDHQBA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT",
"dependencies": {
"queue-microtask": "^1.2.2"
}
},
"node_modules/scule": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/scule/-/scule-1.3.0.tgz",
"integrity": "sha512-6FtHJEvt+pVMIB9IBY+IcCJ6Z5f1iQnytgyfKMhDKgmzYG+TeH/wx1y3l27rshSbLiSanrR9ffZDrEsmjlQF2g==",
"license": "MIT"
},
"node_modules/semver": {
"version": "7.7.1",
"resolved": "https://registry.npmmirror.com/semver/-/semver-7.7.1.tgz",
"integrity": "sha512-hlq8tAfn0m/61p4BVRcPzIGr6LKiMwo4VM6dGi6pt4qcRkmNzTcWq6eCEjEh+qXjkMDvPlOFFSGwQjoEa6gyMA==",
"license": "ISC",
"bin": {
"semver": "bin/semver.js"
},
"engines": {
"node": ">=10"
}
},
"node_modules/slash": {
"version": "5.1.0",
"resolved": "https://registry.npmmirror.com/slash/-/slash-5.1.0.tgz",
"integrity": "sha512-ZA6oR3T/pEyuqwMgAKT0/hAv8oAXckzbkmR0UkUosQ+Mc4RxGoJkRmwHgHufaenlyAgE1Mxgpdcrf75y6XcnDg==",
"license": "MIT",
"engines": {
"node": ">=14.16"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/std-env": {
"version": "3.8.1",
"resolved": "https://registry.npmmirror.com/std-env/-/std-env-3.8.1.tgz",
"integrity": "sha512-vj5lIj3Mwf9D79hBkltk5qmkFI+biIKWS2IBxEyEU3AX1tUf7AoL8nSazCOiiqQsGKIq01SClsKEzweu34uwvA==",
"license": "MIT"
},
"node_modules/strip-literal": {
"version": "3.0.0",
"resolved": "https://registry.npmmirror.com/strip-literal/-/strip-literal-3.0.0.tgz",
"integrity": "sha512-TcccoMhJOM3OebGhSBEmp3UZ2SfDMZUEBdRA/9ynfLi8yYajyWX3JiXArcJt4Umh4vISpspkQIY8ZZoCqjbviA==",
"license": "MIT",
"dependencies": {
"js-tokens": "^9.0.1"
},
"funding": {
"url": "https://github.com/sponsors/antfu"
}
},
"node_modules/tinyexec": {
"version": "0.3.2",
"resolved": "https://registry.npmmirror.com/tinyexec/-/tinyexec-0.3.2.tgz",
"integrity": "sha512-KQQR9yN7R5+OSwaK0XQoj22pwHoTlgYqmUscPYoknOoWCWfj/5/ABTMRi69FrKU5ffPVh5QcFikpWJI/P1ocHA==",
"license": "MIT"
},
"node_modules/tinyglobby": {
"version": "0.2.12",
"resolved": "https://registry.npmmirror.com/tinyglobby/-/tinyglobby-0.2.12.tgz",
"integrity": "sha512-qkf4trmKSIiMTs/E63cxH+ojC2unam7rJ0WrauAzpT3ECNTxGRMlaXxVbfxMUC/w0LaYk6jQ4y/nGR9uBO3tww==",
"license": "MIT",
"dependencies": {
"fdir": "^6.4.3",
"picomatch": "^4.0.2"
},
"engines": {
"node": ">=12.0.0"
},
"funding": {
"url": "https://github.com/sponsors/SuperchupuDev"
}
},
"node_modules/tinyglobby/node_modules/fdir": {
"version": "6.4.3",
"resolved": "https://registry.npmmirror.com/fdir/-/fdir-6.4.3.tgz",
"integrity": "sha512-PMXmW2y1hDDfTSRc9gaXIuCCRpuoz3Kaz8cUelp3smouvfT632ozg2vrT6lJsHKKOF59YLbOGfAWGUcKEfRMQw==",
"license": "MIT",
"peerDependencies": {
"picomatch": "^3 || ^4"
},
"peerDependenciesMeta": {
"picomatch": {
"optional": true
}
}
},
"node_modules/tinyglobby/node_modules/picomatch": {
"version": "4.0.2",
"resolved": "https://registry.npmmirror.com/picomatch/-/picomatch-4.0.2.tgz",
"integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==",
"license": "MIT",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/jonschlinkert"
}
},
"node_modules/to-regex-range": {
"version": "5.0.1",
"resolved": "https://registry.npmmirror.com/to-regex-range/-/to-regex-range-5.0.1.tgz",
"integrity": "sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==",
"license": "MIT",
"dependencies": {
"is-number": "^7.0.0"
},
"engines": {
"node": ">=8.0"
}
},
"node_modules/tr46": {
"version": "0.0.3",
"resolved": "https://registry.npmmirror.com/tr46/-/tr46-0.0.3.tgz",
"integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==",
"license": "MIT"
},
"node_modules/ufo": {
"version": "1.5.4",
"resolved": "https://registry.npmmirror.com/ufo/-/ufo-1.5.4.tgz",
"integrity": "sha512-UsUk3byDzKd04EyoZ7U4DOlxQaD14JUKQl6/P7wiX4FNvUfm3XL246n9W5AmqwW5RSFJ27NAuM0iLscAOYUiGQ==",
"license": "MIT"
},
"node_modules/unctx": {
"version": "2.4.1",
"resolved": "https://registry.npmmirror.com/unctx/-/unctx-2.4.1.tgz",
"integrity": "sha512-AbaYw0Nm4mK4qjhns67C+kgxR2YWiwlDBPzxrN8h8C6VtAdCgditAY5Dezu3IJy4XVqAnbrXt9oQJvsn3fyozg==",
"license": "MIT",
"dependencies": {
"acorn": "^8.14.0",
"estree-walker": "^3.0.3",
"magic-string": "^0.30.17",
"unplugin": "^2.1.0"
}
},
"node_modules/undici-types": {
"version": "5.26.5",
"resolved": "https://registry.npmmirror.com/undici-types/-/undici-types-5.26.5.tgz",
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==",
"license": "MIT"
},
"node_modules/unicorn-magic": {
"version": "0.3.0",
"resolved": "https://registry.npmmirror.com/unicorn-magic/-/unicorn-magic-0.3.0.tgz",
"integrity": "sha512-+QBBXBCvifc56fsbuxZQ6Sic3wqqc3WWaqxs58gvJrcOuN83HGTCwz3oS5phzU9LthRNE9VrJCFCLUgHeeFnfA==",
"license": "MIT",
"engines": {
"node": ">=18"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/unimport": {
"version": "4.1.3",
"resolved": "https://registry.npmmirror.com/unimport/-/unimport-4.1.3.tgz",
"integrity": "sha512-H+IVJ7rAkE3b+oC8rSJ2FsPaVsweeMC8eKZc+C6Mz7+hxDF45AnrY/tVCNRBvzMwWNcJEV67WdAVcal27iMjOw==",
"license": "MIT",
"dependencies": {
"acorn": "^8.14.1",
"escape-string-regexp": "^5.0.0",
"estree-walker": "^3.0.3",
"local-pkg": "^1.1.1",
"magic-string": "^0.30.17",
"mlly": "^1.7.4",
"pathe": "^2.0.3",
"picomatch": "^4.0.2",
"pkg-types": "^2.1.0",
"scule": "^1.3.0",
"strip-literal": "^3.0.0",
"tinyglobby": "^0.2.12",
"unplugin": "^2.2.2",
"unplugin-utils": "^0.2.4"
},
"engines": {
"node": ">=18.12.0"
}
},
"node_modules/unimport/node_modules/picomatch": {
"version": "4.0.2",
"resolved": "https://registry.npmmirror.com/picomatch/-/picomatch-4.0.2.tgz",
"integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==",
"license": "MIT",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/jonschlinkert"
}
},
"node_modules/unplugin": {
"version": "2.2.2",
"resolved": "https://registry.npmmirror.com/unplugin/-/unplugin-2.2.2.tgz",
"integrity": "sha512-Qp+iiD+qCRnUek+nDoYvtWX7tfnYyXsrOnJ452FRTgOyKmTM7TUJ3l+PLPJOOWPTUyKISKp4isC5JJPSXUjGgw==",
"license": "MIT",
"dependencies": {
"acorn": "^8.14.1",
"webpack-virtual-modules": "^0.6.2"
},
"engines": {
"node": ">=18.12.0"
}
},
"node_modules/unplugin-utils": {
"version": "0.2.4",
"resolved": "https://registry.npmmirror.com/unplugin-utils/-/unplugin-utils-0.2.4.tgz",
"integrity": "sha512-8U/MtpkPkkk3Atewj1+RcKIjb5WBimZ/WSLhhR3w6SsIj8XJuKTacSP8g+2JhfSGw0Cb125Y+2zA/IzJZDVbhA==",
"license": "MIT",
"dependencies": {
"pathe": "^2.0.2",
"picomatch": "^4.0.2"
},
"engines": {
"node": ">=18.12.0"
},
"funding": {
"url": "https://github.com/sponsors/sxzz"
}
},
"node_modules/unplugin-utils/node_modules/picomatch": {
"version": "4.0.2",
"resolved": "https://registry.npmmirror.com/picomatch/-/picomatch-4.0.2.tgz",
"integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==",
"license": "MIT",
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/jonschlinkert"
}
},
"node_modules/untyped": {
"version": "2.0.0",
"resolved": "https://registry.npmmirror.com/untyped/-/untyped-2.0.0.tgz",
"integrity": "sha512-nwNCjxJTjNuLCgFr42fEak5OcLuB3ecca+9ksPFNvtfYSLpjf+iJqSIaSnIile6ZPbKYxI5k2AfXqeopGudK/g==",
"license": "MIT",
"dependencies": {
"citty": "^0.1.6",
"defu": "^6.1.4",
"jiti": "^2.4.2",
"knitwork": "^1.2.0",
"scule": "^1.3.0"
},
"bin": {
"untyped": "dist/cli.mjs"
}
},
"node_modules/web-streams-polyfill": {
"version": "4.0.0-beta.3",
"resolved": "https://registry.npmmirror.com/web-streams-polyfill/-/web-streams-polyfill-4.0.0-beta.3.tgz",
"integrity": "sha512-QW95TCTaHmsYfHDybGMwO5IJIM93I/6vTRk+daHTWFPhwh+C8Cg7j7XyKrwrj8Ib6vYXe0ocYNrmzY4xAAN6ug==",
"license": "MIT",
"engines": {
"node": ">= 14"
}
},
"node_modules/webidl-conversions": {
"version": "3.0.1",
"resolved": "https://registry.npmmirror.com/webidl-conversions/-/webidl-conversions-3.0.1.tgz",
"integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==",
"license": "BSD-2-Clause"
},
"node_modules/webpack-virtual-modules": {
"version": "0.6.2",
"resolved": "https://registry.npmmirror.com/webpack-virtual-modules/-/webpack-virtual-modules-0.6.2.tgz",
"integrity": "sha512-66/V2i5hQanC51vBQKPH4aI8NMAcBW59FVBs+rC7eGHupMyfn34q7rZIE+ETlJ+XTevqfUhVVBgSUNSW2flEUQ==",
"license": "MIT"
},
"node_modules/whatwg-url": {
"version": "5.0.0",
"resolved": "https://registry.npmmirror.com/whatwg-url/-/whatwg-url-5.0.0.tgz",
"integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==",
"license": "MIT",
"dependencies": {
"tr46": "~0.0.3",
"webidl-conversions": "^3.0.0"
}
}
}
}
```
## /package.json
```json path="/package.json"
{
"dependencies": {
"marked": "^15.0.7",
"openai": "^4.89.0",
"pinia-plugin-persistedstate": "^4.2.0"
}
}
```
## /qodana.yaml
```yaml path="/qodana.yaml"
version: "1.0"
linter: jetbrains/qodana-python:2024.3
profile:
name: qodana.recommended
include:
- name: CheckDependencyLicenses
```
## /src/Utilities.py
```py path="/src/Utilities.py"
import re, ipaddress
import subprocess
def RegexMatch(regex, text) -> bool:
"""
Regex Match
@param regex: Regex patter
@param text: Text to match
@return: Boolean indicate if the text match the regex pattern
"""
pattern = re.compile(regex)
return pattern.search(text) is not None
def GetRemoteEndpoint() -> str:
"""
Using socket to determine default interface IP address. Thanks, @NOXICS
@return:
"""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("1.1.1.1", 80)) # Connecting to a public IP
wgd_remote_endpoint = s.getsockname()[0]
return str(wgd_remote_endpoint)
def StringToBoolean(value: str):
"""
Convert string boolean to boolean
@param value: Boolean value in string came from Configuration file
@return: Boolean value
"""
return (value.strip().replace(" ", "").lower() in
("yes", "true", "t", "1", 1))
def ValidateIPAddressesWithRange(ips: str) -> bool:
s = ips.replace(" ", "").split(",")
for ip in s:
try:
ipaddress.ip_network(ip)
except ValueError as e:
return False
return True
def ValidateIPAddresses(ips) -> bool:
s = ips.replace(" ", "").split(",")
for ip in s:
try:
ipaddress.ip_address(ip)
except ValueError as e:
return False
return True
def ValidateDNSAddress(addresses) -> tuple[bool, str]:
s = addresses.replace(" ", "").split(",")
for address in s:
if not ValidateIPAddresses(address) and not RegexMatch(
r"(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z][a-z]{0,61}[a-z]", address):
return False, f"{address} does not appear to be an valid DNS address"
return True, ""
def GenerateWireguardPublicKey(privateKey: str) -> tuple[bool, str] | tuple[bool, None]:
try:
publicKey = subprocess.check_output(f"wg pubkey", input=privateKey.encode(), shell=True,
stderr=subprocess.STDOUT)
return True, publicKey.decode().strip('\n')
except subprocess.CalledProcessError:
return False, None
def GenerateWireguardPrivateKey() -> tuple[bool, str] | tuple[bool, None]:
try:
publicKey = subprocess.check_output(f"wg genkey", shell=True,
stderr=subprocess.STDOUT)
return True, publicKey.decode().strip('\n')
except subprocess.CalledProcessError:
return False, None
```
## /src/certbot.ini
```ini path="/src/certbot.ini"
authenticator = standalone
noninteractive = true
agree-tos = true
rsa-key-size = 2048
```
## /src/dashboard.py
```py path="/src/dashboard.py"
import random, shutil, sqlite3, configparser, hashlib, ipaddress, json, os, secrets, subprocess
import time, re, urllib.error, uuid, bcrypt, psutil, pyotp, threading
from uuid import uuid4
from zipfile import ZipFile
from datetime import datetime, timedelta
from typing import Any
from jinja2 import Template
from flask import Flask, request, render_template, session, send_file
from json import JSONEncoder
from flask_cors import CORS
from icmplib import ping, traceroute
from flask.json.provider import DefaultJSONProvider
from itertools import islice
from Utilities import (
RegexMatch, GetRemoteEndpoint, StringToBoolean,
ValidateIPAddressesWithRange, ValidateDNSAddress,
GenerateWireguardPublicKey, GenerateWireguardPrivateKey
)
from packaging import version
from modules.Email import EmailSender
from modules.Log import Log
from modules.DashboardLogger import DashboardLogger
from modules.PeerJobLogger import PeerJobLogger
from modules.PeerJob import PeerJob
from modules.SystemStatus import SystemStatus
SystemStatus = SystemStatus()
DASHBOARD_VERSION = 'v4.2.2'
CONFIGURATION_PATH = os.getenv('CONFIGURATION_PATH', '.')
DB_PATH = os.path.join(CONFIGURATION_PATH, 'db')
if not os.path.isdir(DB_PATH):
os.mkdir(DB_PATH)
DASHBOARD_CONF = os.path.join(CONFIGURATION_PATH, 'wg-dashboard.ini')
UPDATE = None
app = Flask("WGDashboard", template_folder=os.path.abspath("./static/app/dist"))
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 5206928
app.secret_key = secrets.token_urlsafe(32)
class CustomJsonEncoder(DefaultJSONProvider):
def __init__(self, app):
super().__init__(app)
def default(self, o):
if callable(getattr(o, "toJson", None)):
return o.toJson()
return super().default(self)
app.json = CustomJsonEncoder(app)
'''
Response Object
'''
def ResponseObject(status=True, message=None, data=None, status_code = 200) -> Flask.response_class:
response = Flask.make_response(app, {
"status": status,
"message": message,
"data": data
})
response.status_code = status_code
response.content_type = "application/json"
return response
"""
Peer Jobs
"""
class PeerJobs:
def __init__(self):
self.Jobs: list[PeerJob] = []
self.jobdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_job.db'),
check_same_thread=False)
self.jobdb.row_factory = sqlite3.Row
self.__createPeerJobsDatabase()
self.__getJobs()
def __getJobs(self):
self.Jobs.clear()
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
jobs = jobdbCursor.execute("SELECT * FROM PeerJobs WHERE ExpireDate IS NULL").fetchall()
for job in jobs:
self.Jobs.append(PeerJob(
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
job['CreationDate'], job['ExpireDate'], job['Action']))
def getAllJobs(self, configuration: str = None):
if configuration is not None:
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
jobs = jobdbCursor.execute(
f"SELECT * FROM PeerJobs WHERE Configuration = ?", (configuration, )).fetchall()
j = []
for job in jobs:
j.append(PeerJob(
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
job['CreationDate'], job['ExpireDate'], job['Action']))
return j
return []
def __createPeerJobsDatabase(self):
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
existingTable = jobdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall()
existingTable = [t['name'] for t in existingTable]
if "PeerJobs" not in existingTable:
jobdbCursor.execute('''
CREATE TABLE PeerJobs (JobID VARCHAR NOT NULL, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL,
Field VARCHAR NOT NULL, Operator VARCHAR NOT NULL, Value VARCHAR NOT NULL, CreationDate DATETIME,
ExpireDate DATETIME, Action VARCHAR NOT NULL, PRIMARY KEY (JobID))
''')
self.jobdb.commit()
def toJson(self):
return [x.toJson() for x in self.Jobs]
def searchJob(self, Configuration: str, Peer: str):
return list(filter(lambda x: x.Configuration == Configuration and x.Peer == Peer, self.Jobs))
def searchJobById(self, JobID):
return list(filter(lambda x: x.JobID == JobID, self.Jobs))
def saveJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]:
try:
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
if len(self.searchJobById(Job.JobID)) == 0:
jobdbCursor.execute('''
INSERT INTO PeerJobs VALUES (?, ?, ?, ?, ?, ?, strftime('%Y-%m-%d %H:%M:%S','now'), NULL, ?)
''', (Job.JobID, Job.Configuration, Job.Peer, Job.Field, Job.Operator, Job.Value, Job.Action,))
JobLogger.log(Job.JobID, Message=f"Job is created if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
else:
currentJob = jobdbCursor.execute('SELECT * FROM PeerJobs WHERE JobID = ?', (Job.JobID, )).fetchone()
if currentJob is not None:
jobdbCursor.execute('''
UPDATE PeerJobs SET Field = ?, Operator = ?, Value = ?, Action = ? WHERE JobID = ?
''', (Job.Field, Job.Operator, Job.Value, Job.Action, Job.JobID))
JobLogger.log(Job.JobID,
Message=f"Job is updated from if {currentJob['Field']} {currentJob['Operator']} {currentJob['value']} then {currentJob['Action']}; to if {Job.Field} {Job.Operator} {Job.Value} then {Job.Action}")
self.jobdb.commit()
self.__getJobs()
return True, list(
filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID,
self.Jobs))
except Exception as e:
return False, str(e)
def deleteJob(self, Job: PeerJob) -> tuple[bool, list] | tuple[bool, str]:
try:
if (len(str(Job.CreationDate))) == 0:
return False, "Job does not exist"
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
jobdbCursor.execute('''
UPDATE PeerJobs SET ExpireDate = strftime('%Y-%m-%d %H:%M:%S','now') WHERE JobID = ?
''', (Job.JobID,))
self.jobdb.commit()
JobLogger.log(Job.JobID, Message=f"Job is removed due to being deleted or finshed.")
self.__getJobs()
return True, list(
filter(lambda x: x.Configuration == Job.Configuration and x.Peer == Job.Peer and x.JobID == Job.JobID,
self.Jobs))
except Exception as e:
return False, str(e)
def updateJobConfigurationName(self, ConfigurationName: str, NewConfigurationName: str) -> tuple[bool, str] | tuple[bool, None]:
try:
with self.jobdb:
jobdbCursor = self.jobdb.cursor()
jobdbCursor.execute('''
UPDATE PeerJobs SET Configuration = ? WHERE Configuration = ?
''', (NewConfigurationName, ConfigurationName, ))
self.jobdb.commit()
self.__getJobs()
return True, None
except Exception as e:
return False, str(e)
def runJob(self):
needToDelete = []
for job in self.Jobs:
c = WireguardConfigurations.get(job.Configuration)
if c is not None:
f, fp = c.searchPeer(job.Peer)
if f:
if job.Field in ["total_receive", "total_sent", "total_data"]:
s = job.Field.split("_")[1]
x: float = getattr(fp, f"total_{s}") + getattr(fp, f"cumu_{s}")
y: float = float(job.Value)
else:
x: datetime = datetime.now()
y: datetime = datetime.strptime(job.Value, "%Y-%m-%d %H:%M:%S")
runAction: bool = self.__runJob_Compare(x, y, job.Operator)
if runAction:
s = False
if job.Action == "restrict":
s = c.restrictPeers([fp.id]).get_json()
elif job.Action == "delete":
s = c.deletePeers([fp.id]).get_json()
if s['status'] is True:
JobLogger.log(job.JobID, s["status"],
f"Peer {fp.id} from {c.Name} is successfully {job.Action}ed."
)
needToDelete.append(job)
else:
JobLogger.log(job.JobID, s["status"],
f"Peer {fp.id} from {c.Name} failed {job.Action}ed."
)
else:
JobLogger.log(job.JobID, False,
f"Somehow can't find this peer {job.Peer} from {c.Name} failed {job.Action}ed."
)
else:
JobLogger.log(job.JobID, False,
f"Somehow can't find this peer {job.Peer} from {job.Configuration} failed {job.Action}ed."
)
for j in needToDelete:
self.deleteJob(j)
def __runJob_Compare(self, x: float | datetime, y: float | datetime, operator: str):
if operator == "eq":
return x == y
if operator == "neq":
return x != y
if operator == "lgt":
return x > y
if operator == "lst":
return x < y
"""
Peer Share Link
"""
class PeerShareLink:
def __init__(self, ShareID:str, Configuration: str, Peer: str, ExpireDate: datetime, ShareDate: datetime):
self.ShareID = ShareID
self.Peer = Peer
self.Configuration = Configuration
self.ShareDate = ShareDate
self.ExpireDate = ExpireDate
def toJson(self):
return {
"ShareID": self.ShareID,
"Peer": self.Peer,
"Configuration": self.Configuration,
"ExpireDate": self.ExpireDate
}
"""
Peer Share Links
"""
class PeerShareLinks:
def __init__(self):
self.Links: list[PeerShareLink] = []
existingTables = sqlSelect("SELECT name FROM sqlite_master WHERE type='table' and name = 'PeerShareLinks'").fetchall()
if len(existingTables) == 0:
sqlUpdate(
"""
CREATE TABLE PeerShareLinks (
ShareID VARCHAR NOT NULL PRIMARY KEY, Configuration VARCHAR NOT NULL, Peer VARCHAR NOT NULL,
ExpireDate DATETIME,
SharedDate DATETIME DEFAULT (datetime('now', 'localtime'))
)
"""
)
self.__getSharedLinks()
def __getSharedLinks(self):
self.Links.clear()
allLinks = sqlSelect("SELECT * FROM PeerShareLinks WHERE ExpireDate IS NULL OR ExpireDate > datetime('now', 'localtime')").fetchall()
for link in allLinks:
self.Links.append(PeerShareLink(*link))
def getLink(self, Configuration: str, Peer: str) -> list[PeerShareLink]:
return list(filter(lambda x : x.Configuration == Configuration and x.Peer == Peer, self.Links))
def getLinkByID(self, ShareID: str) -> list[PeerShareLink]:
self.__getSharedLinks()
return list(filter(lambda x : x.ShareID == ShareID, self.Links))
def addLink(self, Configuration: str, Peer: str, ExpireDate: datetime = None) -> tuple[bool, str]:
try:
newShareID = str(uuid.uuid4())
if len(self.getLink(Configuration, Peer)) > 0:
sqlUpdate("UPDATE PeerShareLinks SET ExpireDate = datetime('now', 'localtime') WHERE Configuration = ? AND Peer = ?", (Configuration, Peer, ))
sqlUpdate("INSERT INTO PeerShareLinks (ShareID, Configuration, Peer, ExpireDate) VALUES (?, ?, ?, ?)", (newShareID, Configuration, Peer, ExpireDate, ))
self.__getSharedLinks()
except Exception as e:
return False, str(e)
return True, newShareID
def updateLinkExpireDate(self, ShareID, ExpireDate: datetime = None) -> tuple[bool, str]:
sqlUpdate("UPDATE PeerShareLinks SET ExpireDate = ? WHERE ShareID = ?;", (ExpireDate, ShareID, ))
self.__getSharedLinks()
return True, ""
"""
WireGuard Configuration
"""
class WireguardConfiguration:
class InvalidConfigurationFileException(Exception):
def __init__(self, m):
self.message = m
def __str__(self):
return self.message
def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False, wg: bool = True):
self.__parser: configparser.ConfigParser = configparser.RawConfigParser(strict=False)
self.__parser.optionxform = str
self.__configFileModifiedTime = None
self.Status: bool = False
self.Name: str = ""
self.PrivateKey: str = ""
self.PublicKey: str = ""
self.ListenPort: str = ""
self.Address: str = ""
self.DNS: str = ""
self.Table: str = ""
self.MTU: str = ""
self.PreUp: str = ""
self.PostUp: str = ""
self.PreDown: str = ""
self.PostDown: str = ""
self.SaveConfig: bool = True
self.Name = name
self.Protocol = "wg" if wg else "awg"
self.configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf') if wg else os.path.join(DashboardConfig.GetConfig("Server", "awg_conf_path")[1], f'{self.Name}.conf')
if name is not None:
if data is not None and "Backup" in data.keys():
db = self.__importDatabase(
os.path.join(
self.__getProtocolPath(),
'WGDashboard_Backup',
data["Backup"].replace(".conf", ".sql")))
else:
self.createDatabase()
self.__parseConfigurationFile()
self.__initPeersList()
else:
self.Name = data["ConfigurationName"]
self.configPath = os.path.join(self.__getProtocolPath(), f'{self.Name}.conf')
for i in dir(self):
if str(i) in data.keys():
if isinstance(getattr(self, i), bool):
setattr(self, i, StringToBoolean(data[i]))
else:
setattr(self, i, str(data[i]))
self.__parser["Interface"] = {
"PrivateKey": self.PrivateKey,
"Address": self.Address,
"ListenPort": self.ListenPort,
"PreUp": f"{self.PreUp}",
"PreDown": f"{self.PreDown}",
"PostUp": f"{self.PostUp}",
"PostDown": f"{self.PostDown}",
"SaveConfig": "true"
}
if self.Protocol == 'awg':
self.__parser["Interface"]["Jc"] = self.Jc
self.__parser["Interface"]["Jc"] = self.Jc
self.__parser["Interface"]["Jmin"] = self.Jmin
self.__parser["Interface"]["Jmax"] = self.Jmax
self.__parser["Interface"]["S1"] = self.S1
self.__parser["Interface"]["S2"] = self.S2
self.__parser["Interface"]["H1"] = self.H1
self.__parser["Interface"]["H2"] = self.H2
self.__parser["Interface"]["H3"] = self.H3
self.__parser["Interface"]["H4"] = self.H4
if "Backup" not in data.keys():
self.createDatabase()
with open(self.configPath, "w+") as configFile:
self.__parser.write(configFile)
print(f"[WGDashboard] Configuration file {self.configPath} created")
self.__initPeersList()
if not os.path.exists(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')):
os.mkdir(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup'))
print(f"[WGDashboard] Initialized Configuration: {name}")
if self.getAutostartStatus() and not self.getStatus() and startup:
self.toggleConfiguration()
print(f"[WGDashboard] Autostart Configuration: {name}")
def __getProtocolPath(self):
return DashboardConfig.GetConfig("Server", "wg_conf_path")[1] if self.Protocol == "wg" \
else DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
def __initPeersList(self):
self.Peers: list[Peer] = []
self.getPeersList()
self.getRestrictedPeersList()
def getRawConfigurationFile(self):
return open(self.configPath, 'r').read()
def updateRawConfigurationFile(self, newRawConfiguration):
backupStatus, backup = self.backupConfigurationFile()
if not backupStatus:
return False, "Cannot create backup"
if self.Status:
self.toggleConfiguration()
with open(self.configPath, 'w') as f:
f.write(newRawConfiguration)
status, err = self.toggleConfiguration()
if not status:
restoreStatus = self.restoreBackup(backup['filename'])
print(f"Restore status: {restoreStatus}")
self.toggleConfiguration()
return False, err
return True, None
def __parseConfigurationFile(self):
with open(self.configPath, 'r') as f:
original = [l.rstrip("\n") for l in f.readlines()]
try:
start = original.index("[Interface]")
# Clean
for i in range(start, len(original)):
if original[i] == "[Peer]":
break
split = re.split(r'\s*=\s*', original[i], 1)
if len(split) == 2:
key = split[0]
if key in dir(self):
if isinstance(getattr(self, key), bool):
setattr(self, key, False)
else:
setattr(self, key, "")
# Set
for i in range(start, len(original)):
if original[i] == "[Peer]":
break
split = re.split(r'\s*=\s*', original[i], 1)
if len(split) == 2:
key = split[0]
value = split[1]
if key in dir(self):
if isinstance(getattr(self, key), bool):
setattr(self, key, StringToBoolean(value))
else:
if len(getattr(self, key)) > 0:
setattr(self, key, f"{getattr(self, key)}, {value}")
else:
setattr(self, key, value)
except ValueError as e:
raise self.InvalidConfigurationFileException(
"[Interface] section not found in " + self.configPath)
if self.PrivateKey:
self.PublicKey = self.__getPublicKey()
self.Status = self.getStatus()
def __dropDatabase(self):
existingTables = sqlSelect(f"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '{self.Name}%'").fetchall()
for t in existingTables:
sqlUpdate("DROP TABLE '%s'" % t['name'])
existingTables = sqlSelect(f"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '{self.Name}%'").fetchall()
def createDatabase(self, dbName = None):
if dbName is None:
dbName = self.Name
existingTables = sqlSelect("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
existingTables = [t['name'] for t in existingTables]
if dbName not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s'(
id VARCHAR NOT NULL, private_key VARCHAR NULL, DNS VARCHAR NULL,
endpoint_allowed_ip VARCHAR NULL, name VARCHAR NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL, endpoint VARCHAR NULL,
status VARCHAR NULL, latest_handshake VARCHAR NULL, allowed_ip VARCHAR NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, mtu INT NULL,
keepalive INT NULL, remote_endpoint VARCHAR NULL, preshared_key VARCHAR NULL,
PRIMARY KEY (id)
)
""" % dbName
)
if f'{dbName}_restrict_access' not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s_restrict_access' (
id VARCHAR NOT NULL, private_key VARCHAR NULL, DNS VARCHAR NULL,
endpoint_allowed_ip VARCHAR NULL, name VARCHAR NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL, endpoint VARCHAR NULL,
status VARCHAR NULL, latest_handshake VARCHAR NULL, allowed_ip VARCHAR NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, mtu INT NULL,
keepalive INT NULL, remote_endpoint VARCHAR NULL, preshared_key VARCHAR NULL,
PRIMARY KEY (id)
)
""" % dbName
)
if f'{dbName}_transfer' not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s_transfer' (
id VARCHAR NOT NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, time DATETIME
)
""" % dbName
)
if f'{dbName}_deleted' not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s_deleted' (
id VARCHAR NOT NULL, private_key VARCHAR NULL, DNS VARCHAR NULL,
endpoint_allowed_ip VARCHAR NULL, name VARCHAR NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL, endpoint VARCHAR NULL,
status VARCHAR NULL, latest_handshake VARCHAR NULL, allowed_ip VARCHAR NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, mtu INT NULL,
keepalive INT NULL, remote_endpoint VARCHAR NULL, preshared_key VARCHAR NULL,
PRIMARY KEY (id)
)
""" % dbName
)
def __dumpDatabase(self):
for line in sqldb.iterdump():
if (line.startswith(f"INSERT INTO \"{self.Name}\"")
or line.startswith(f'INSERT INTO "{self.Name}_restrict_access"')
or line.startswith(f'INSERT INTO "{self.Name}_transfer"')
or line.startswith(f'INSERT INTO "{self.Name}_deleted"')
):
yield line
def __importDatabase(self, sqlFilePath) -> bool:
self.__dropDatabase()
self.createDatabase()
if not os.path.exists(sqlFilePath):
return False
with open(sqlFilePath, 'r') as f:
for l in f.readlines():
l = l.rstrip("\n")
if len(l) > 0:
sqlUpdate(l)
return True
def __getPublicKey(self) -> str:
return GenerateWireguardPublicKey(self.PrivateKey)[1]
def getStatus(self) -> bool:
self.Status = self.Name in psutil.net_if_addrs().keys()
return self.Status
def getAutostartStatus(self):
s, d = DashboardConfig.GetConfig("WireGuardConfiguration", "autostart")
return self.Name in d
def getRestrictedPeers(self):
self.RestrictedPeers = []
restricted = sqlSelect("SELECT * FROM '%s_restrict_access'" % self.Name).fetchall()
for i in restricted:
self.RestrictedPeers.append(Peer(i, self))
def configurationFileChanged(self) :
mt = os.path.getmtime(self.configPath)
changed = self.__configFileModifiedTime is None or self.__configFileModifiedTime != mt
self.__configFileModifiedTime = mt
return changed
def getPeers(self):
if self.configurationFileChanged():
self.Peers = []
with open(self.configPath, 'r') as configFile:
p = []
pCounter = -1
content = configFile.read().split('\n')
try:
peerStarts = content.index("[Peer]")
content = content[peerStarts:]
for i in content:
if not RegexMatch("#(.*)", i) and not RegexMatch(";(.*)", i):
if i == "[Peer]":
pCounter += 1
p.append({})
p[pCounter]["name"] = ""
else:
if len(i) > 0:
split = re.split(r'\s*=\s*', i, 1)
if len(split) == 2:
p[pCounter][split[0]] = split[1]
if RegexMatch("#Name# = (.*)", i):
split = re.split(r'\s*=\s*', i, 1)
if len(split) == 2:
p[pCounter]["name"] = split[1]
for i in p:
if "PublicKey" in i.keys():
checkIfExist = sqlSelect("SELECT * FROM '%s' WHERE id = ?" % self.Name,
((i['PublicKey']),)).fetchone()
if checkIfExist is None:
newPeer = {
"id": i['PublicKey'],
"private_key": "",
"DNS": DashboardConfig.GetConfig("Peers", "peer_global_DNS")[1],
"endpoint_allowed_ip": DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[
1],
"name": i.get("name"),
"total_receive": 0,
"total_sent": 0,
"total_data": 0,
"endpoint": "N/A",
"status": "stopped",
"latest_handshake": "N/A",
"allowed_ip": i.get("AllowedIPs", "N/A"),
"cumu_receive": 0,
"cumu_sent": 0,
"cumu_data": 0,
"traffic": [],
"mtu": DashboardConfig.GetConfig("Peers", "peer_mtu")[1],
"keepalive": DashboardConfig.GetConfig("Peers", "peer_keep_alive")[1],
"remote_endpoint": DashboardConfig.GetConfig("Peers", "remote_endpoint")[1],
"preshared_key": i["PresharedKey"] if "PresharedKey" in i.keys() else ""
}
sqlUpdate(
"""
INSERT INTO '%s'
VALUES (:id, :private_key, :DNS, :endpoint_allowed_ip, :name, :total_receive, :total_sent,
:total_data, :endpoint, :status, :latest_handshake, :allowed_ip, :cumu_receive, :cumu_sent,
:cumu_data, :mtu, :keepalive, :remote_endpoint, :preshared_key);
""" % self.Name
, newPeer)
self.Peers.append(Peer(newPeer, self))
else:
sqlUpdate("UPDATE '%s' SET allowed_ip = ? WHERE id = ?" % self.Name,
(i.get("AllowedIPs", "N/A"), i['PublicKey'],))
self.Peers.append(Peer(checkIfExist, self))
except Exception as e:
if __name__ == '__main__':
print(f"[WGDashboard] {self.Name} Error: {str(e)}")
else:
self.Peers.clear()
checkIfExist = sqlSelect("SELECT * FROM '%s'" % self.Name).fetchall()
for i in checkIfExist:
self.Peers.append(Peer(i, self))
def addPeers(self, peers: list) -> tuple[bool, dict]:
result = {
"message": None,
"peers": []
}
try:
for i in peers:
newPeer = {
"id": i['id'],
"private_key": i['private_key'],
"DNS": i['DNS'],
"endpoint_allowed_ip": i['endpoint_allowed_ip'],
"name": i['name'],
"total_receive": 0,
"total_sent": 0,
"total_data": 0,
"endpoint": "N/A",
"status": "stopped",
"latest_handshake": "N/A",
"allowed_ip": i.get("allowed_ip", "N/A"),
"cumu_receive": 0,
"cumu_sent": 0,
"cumu_data": 0,
"traffic": [],
"mtu": i['mtu'],
"keepalive": i['keepalive'],
"remote_endpoint": DashboardConfig.GetConfig("Peers", "remote_endpoint")[1],
"preshared_key": i["preshared_key"]
}
sqlUpdate(
"""
INSERT INTO '%s'
VALUES (:id, :private_key, :DNS, :endpoint_allowed_ip, :name, :total_receive, :total_sent,
:total_data, :endpoint, :status, :latest_handshake, :allowed_ip, :cumu_receive, :cumu_sent,
:cumu_data, :mtu, :keepalive, :remote_endpoint, :preshared_key);
""" % self.Name
, newPeer)
for p in peers:
presharedKeyExist = len(p['preshared_key']) > 0
rd = random.Random()
uid = str(uuid.UUID(int=rd.getrandbits(128), version=4))
if presharedKeyExist:
with open(uid, "w+") as f:
f.write(p['preshared_key'])
subprocess.check_output(f"{self.Protocol} set {self.Name} peer {p['id']} allowed-ips {p['allowed_ip'].replace(' ', '')}{f' preshared-key {uid}' if presharedKeyExist else ''}",
shell=True, stderr=subprocess.STDOUT)
if presharedKeyExist:
os.remove(uid)
subprocess.check_output(
f"{self.Protocol}-quick save {self.Name}", shell=True, stderr=subprocess.STDOUT)
self.getPeersList()
for p in peers:
p = self.searchPeer(p['id'])
if p[0]:
result['peers'].append(p[1])
return True, result
except Exception as e:
result['message'] = str(e)
return False, result
def searchPeer(self, publicKey):
for i in self.Peers:
if i.id == publicKey:
return True, i
return False, None
def allowAccessPeers(self, listOfPublicKeys):
if not self.getStatus():
self.toggleConfiguration()
for i in listOfPublicKeys:
p = sqlSelect("SELECT * FROM '%s_restrict_access' WHERE id = ?" % self.Name, (i,)).fetchone()
if p is not None:
sqlUpdate("INSERT INTO '%s' SELECT * FROM '%s_restrict_access' WHERE id = ?"
% (self.Name, self.Name,), (p['id'],))
sqlUpdate("DELETE FROM '%s_restrict_access' WHERE id = ?"
% self.Name, (p['id'],))
presharedKeyExist = len(p['preshared_key']) > 0
rd = random.Random()
uid = str(uuid.UUID(int=rd.getrandbits(128), version=4))
if presharedKeyExist:
with open(uid, "w+") as f:
f.write(p['preshared_key'])
subprocess.check_output(f"{self.Protocol} set {self.Name} peer {p['id']} allowed-ips {p['allowed_ip'].replace(' ', '')}{f' preshared-key {uid}' if presharedKeyExist else ''}",
shell=True, stderr=subprocess.STDOUT)
if presharedKeyExist: os.remove(uid)
else:
return ResponseObject(False, "Failed to allow access of peer " + i)
if not self.__wgSave():
return ResponseObject(False, "Failed to save configuration through WireGuard")
self.getPeers()
return ResponseObject(True, "Allow access successfully")
def restrictPeers(self, listOfPublicKeys):
numOfRestrictedPeers = 0
numOfFailedToRestrictPeers = 0
if not self.getStatus():
self.toggleConfiguration()
for p in listOfPublicKeys:
found, pf = self.searchPeer(p)
if found:
try:
subprocess.check_output(f"{self.Protocol} set {self.Name} peer {pf.id} remove",
shell=True, stderr=subprocess.STDOUT)
sqlUpdate("INSERT INTO '%s_restrict_access' SELECT * FROM '%s' WHERE id = ?" %
(self.Name, self.Name,), (pf.id,))
sqlUpdate("UPDATE '%s_restrict_access' SET status = 'stopped' WHERE id = ?" %
(self.Name,), (pf.id,))
sqlUpdate("DELETE FROM '%s' WHERE id = ?" % self.Name, (pf.id,))
numOfRestrictedPeers += 1
except Exception as e:
numOfFailedToRestrictPeers += 1
if not self.__wgSave():
return ResponseObject(False, "Failed to save configuration through WireGuard")
self.getPeers()
if numOfRestrictedPeers == len(listOfPublicKeys):
return ResponseObject(True, f"Restricted {numOfRestrictedPeers} peer(s)")
return ResponseObject(False,
f"Restricted {numOfRestrictedPeers} peer(s) successfully. Failed to restrict {numOfFailedToRestrictPeers} peer(s)")
pass
def deletePeers(self, listOfPublicKeys):
numOfDeletedPeers = 0
numOfFailedToDeletePeers = 0
if not self.getStatus():
self.toggleConfiguration()
for p in listOfPublicKeys:
found, pf = self.searchPeer(p)
if found:
try:
subprocess.check_output(f"{self.Protocol} set {self.Name} peer {pf.id} remove",
shell=True, stderr=subprocess.STDOUT)
sqlUpdate("DELETE FROM '%s' WHERE id = ?" % self.Name, (pf.id,))
numOfDeletedPeers += 1
except Exception as e:
numOfFailedToDeletePeers += 1
if not self.__wgSave():
return ResponseObject(False, "Failed to save configuration through WireGuard")
self.getPeers()
if numOfDeletedPeers == len(listOfPublicKeys):
return ResponseObject(True, f"Deleted {numOfDeletedPeers} peer(s)")
return ResponseObject(False,
f"Deleted {numOfDeletedPeers} peer(s) successfully. Failed to delete {numOfFailedToDeletePeers} peer(s)")
def __wgSave(self) -> tuple[bool, str] | tuple[bool, None]:
try:
subprocess.check_output(f"{self.Protocol}-quick save {self.Name}", shell=True, stderr=subprocess.STDOUT)
return True, None
except subprocess.CalledProcessError as e:
return False, str(e)
def getPeersLatestHandshake(self):
if not self.getStatus():
self.toggleConfiguration()
try:
latestHandshake = subprocess.check_output(f"{self.Protocol} show {self.Name} latest-handshakes",
shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return "stopped"
latestHandshake = latestHandshake.decode("UTF-8").split()
count = 0
now = datetime.now()
time_delta = timedelta(minutes=2)
for _ in range(int(len(latestHandshake) / 2)):
minus = now - datetime.fromtimestamp(int(latestHandshake[count + 1]))
if minus < time_delta:
status = "running"
else:
status = "stopped"
if int(latestHandshake[count + 1]) > 0:
sqlUpdate("UPDATE '%s' SET latest_handshake = ?, status = ? WHERE id= ?" % self.Name
, (str(minus).split(".", maxsplit=1)[0], status, latestHandshake[count],))
else:
sqlUpdate("UPDATE '%s' SET latest_handshake = 'No Handshake', status = ? WHERE id= ?" % self.Name
, (status, latestHandshake[count],))
count += 2
def getPeersTransfer(self):
if not self.getStatus():
self.toggleConfiguration()
try:
data_usage = subprocess.check_output(f"{self.Protocol} show {self.Name} transfer",
shell=True, stderr=subprocess.STDOUT)
data_usage = data_usage.decode("UTF-8").split("\n")
data_usage = [p.split("\t") for p in data_usage]
for i in range(len(data_usage)):
if len(data_usage[i]) == 3:
cur_i = sqlSelect(
"SELECT total_receive, total_sent, cumu_receive, cumu_sent, status FROM '%s' WHERE id= ? "
% self.Name, (data_usage[i][0],)).fetchone()
if cur_i is not None:
cur_i = dict(cur_i)
total_sent = cur_i['total_sent']
total_receive = cur_i['total_receive']
cur_total_sent = float(data_usage[i][2]) / (1024 ** 3)
cur_total_receive = float(data_usage[i][1]) / (1024 ** 3)
cumulative_receive = cur_i['cumu_receive'] + total_receive
cumulative_sent = cur_i['cumu_sent'] + total_sent
if total_sent <= cur_total_sent and total_receive <= cur_total_receive:
total_sent = cur_total_sent
total_receive = cur_total_receive
else:
sqlUpdate(
"UPDATE '%s' SET cumu_receive = ?, cumu_sent = ?, cumu_data = ? WHERE id = ?" %
self.Name, (cumulative_receive, cumulative_sent,
cumulative_sent + cumulative_receive,
data_usage[i][0],))
total_sent = 0
total_receive = 0
_, p = self.searchPeer(data_usage[i][0])
if p.total_receive != total_receive or p.total_sent != total_sent:
sqlUpdate(
"UPDATE '%s' SET total_receive = ?, total_sent = ?, total_data = ? WHERE id = ?"
% self.Name, (total_receive, total_sent,
total_receive + total_sent, data_usage[i][0],))
except Exception as e:
print(f"[WGDashboard] {self.Name} Error: {str(e)} {str(e.__traceback__)}")
def getPeersEndpoint(self):
if not self.getStatus():
self.toggleConfiguration()
try:
data_usage = subprocess.check_output(f"{self.Protocol} show {self.Name} endpoints",
shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return "stopped"
data_usage = data_usage.decode("UTF-8").split()
count = 0
for _ in range(int(len(data_usage) / 2)):
sqlUpdate("UPDATE '%s' SET endpoint = ? WHERE id = ?" % self.Name
, (data_usage[count + 1], data_usage[count],))
count += 2
def toggleConfiguration(self) -> [bool, str]:
self.getStatus()
if self.Status:
try:
check = subprocess.check_output(f"{self.Protocol}-quick down {self.Name}",
shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
return False, str(exc.output.strip().decode("utf-8"))
else:
try:
check = subprocess.check_output(f"{self.Protocol}-quick up {self.Name}", shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
return False, str(exc.output.strip().decode("utf-8"))
self.__parseConfigurationFile()
self.getStatus()
return True, None
def getPeersList(self):
self.getPeers()
return self.Peers
def getRestrictedPeersList(self) -> list:
self.getRestrictedPeers()
return self.RestrictedPeers
def toJson(self):
self.Status = self.getStatus()
return {
"Status": self.Status,
"Name": self.Name,
"PrivateKey": self.PrivateKey,
"PublicKey": self.PublicKey,
"Address": self.Address,
"ListenPort": self.ListenPort,
"PreUp": self.PreUp,
"PreDown": self.PreDown,
"PostUp": self.PostUp,
"PostDown": self.PostDown,
"SaveConfig": self.SaveConfig,
"DataUsage": {
"Total": sum(list(map(lambda x: x.cumu_data + x.total_data, self.Peers))),
"Sent": sum(list(map(lambda x: x.cumu_sent + x.total_sent, self.Peers))),
"Receive": sum(list(map(lambda x: x.cumu_receive + x.total_receive, self.Peers)))
},
"ConnectedPeers": len(list(filter(lambda x: x.status == "running", self.Peers))),
"TotalPeers": len(self.Peers),
"Protocol": self.Protocol
}
def backupConfigurationFile(self) -> tuple[bool, dict[str, str]]:
if not os.path.exists(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')):
os.mkdir(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup'))
time = datetime.now().strftime("%Y%m%d%H%M%S")
shutil.copy(
self.configPath,
os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f'{self.Name}_{time}.conf')
)
with open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f'{self.Name}_{time}.sql'), 'w+') as f:
for l in self.__dumpDatabase():
f.write(l + "\n")
return True, {
"filename": f'{self.Name}_{time}.conf',
"backupDate": datetime.now().strftime("%Y%m%d%H%M%S")
}
def getBackups(self, databaseContent: bool = False) -> list[dict[str: str, str: str, str: str]]:
backups = []
directory = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup')
files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True)
for f, ct in files:
if RegexMatch(f"^({self.Name})_(.*)\\.(conf)$", f):
s = re.search(f"^({self.Name})_(.*)\\.(conf)$", f)
date = s.group(2)
d = {
"filename": f,
"backupDate": date,
"content": open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f), 'r').read()
}
if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True
if databaseContent:
d['databaseContent'] = open(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
backups.append(d)
return backups
def restoreBackup(self, backupFileName: str) -> bool:
backups = list(map(lambda x : x['filename'], self.getBackups()))
if backupFileName not in backups:
return False
if self.Status:
self.toggleConfiguration()
target = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName)
targetSQL = os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName.replace(".conf", ".sql"))
if not os.path.exists(target):
return False
targetContent = open(target, 'r').read()
try:
with open(self.configPath, 'w') as f:
f.write(targetContent)
except Exception as e:
return False
self.__parseConfigurationFile()
self.__dropDatabase()
self.__importDatabase(targetSQL)
self.__initPeersList()
return True
def deleteBackup(self, backupFileName: str) -> bool:
backups = list(map(lambda x : x['filename'], self.getBackups()))
if backupFileName not in backups:
return False
try:
os.remove(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backupFileName))
except Exception as e:
return False
return True
def downloadBackup(self, backupFileName: str) -> tuple[bool, str] | tuple[bool, None]:
backup = list(filter(lambda x : x['filename'] == backupFileName, self.getBackups()))
if len(backup) == 0:
return False, None
zip = f'{str(uuid.UUID(int=random.Random().getrandbits(128), version=4))}.zip'
with ZipFile(os.path.join('download', zip), 'w') as zipF:
zipF.write(
os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backup[0]['filename']),
os.path.basename(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backup[0]['filename']))
)
if backup[0]['database']:
zipF.write(
os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backup[0]['filename'].replace('.conf', '.sql')),
os.path.basename(os.path.join(self.__getProtocolPath(), 'WGDashboard_Backup', backup[0]['filename'].replace('.conf', '.sql')))
)
return True, zip
def updateConfigurationSettings(self, newData: dict) -> tuple[bool, str]:
if self.Status:
self.toggleConfiguration()
original = []
dataChanged = False
with open(self.configPath, 'r') as f:
original = [l.rstrip("\n") for l in f.readlines()]
allowEdit = ["Address", "PreUp", "PostUp", "PreDown", "PostDown", "ListenPort"]
if self.Protocol == 'awg':
allowEdit += ["Jc", "Jmin", "Jmax", "S1", "S2", "H1", "H2", "H3", "H4"]
start = original.index("[Interface]")
try:
end = original.index("[Peer]")
except ValueError as e:
end = len(original)
new = ["[Interface]"]
peerFound = False
for line in range(start, end):
split = re.split(r'\s*=\s*', original[line], 1)
if len(split) == 2:
if split[0] not in allowEdit:
new.append(original[line])
for key in allowEdit:
new.insert(1, f"{key} = {str(newData[key]).strip()}")
new.append("")
for line in range(end, len(original)):
new.append(original[line])
self.backupConfigurationFile()
with open(self.configPath, 'w') as f:
f.write("\n".join(new))
status, msg = self.toggleConfiguration()
if not status:
return False, msg
for i in allowEdit:
if isinstance(getattr(self, i), bool):
setattr(self, i, _strToBool(newData[i]))
else:
setattr(self, i, str(newData[i]))
return True, ""
def deleteConfiguration(self):
if self.getStatus():
self.toggleConfiguration()
os.remove(self.configPath)
self.__dropDatabase()
return True
def renameConfiguration(self, newConfigurationName) -> tuple[bool, str]:
if newConfigurationName in WireguardConfigurations.keys():
return False, "Configuration name already exist"
try:
if self.getStatus():
self.toggleConfiguration()
self.createDatabase(newConfigurationName)
sqlUpdate(f'INSERT INTO "{newConfigurationName}" SELECT * FROM "{self.Name}"')
sqlUpdate(f'INSERT INTO "{newConfigurationName}_restrict_access" SELECT * FROM "{self.Name}_restrict_access"')
sqlUpdate(f'INSERT INTO "{newConfigurationName}_deleted" SELECT * FROM "{self.Name}_deleted"')
sqlUpdate(f'INSERT INTO "{newConfigurationName}_transfer" SELECT * FROM "{self.Name}_transfer"')
AllPeerJobs.updateJobConfigurationName(self.Name, newConfigurationName)
shutil.copy(
self.configPath,
os.path.join(self.__getProtocolPath(), f'{newConfigurationName}.conf')
)
self.deleteConfiguration()
except Exception as e:
return False, str(e)
return True, None
def getNumberOfAvailableIP(self):
if len(self.Address) < 0:
return False, None
existedAddress = set()
availableAddress = {}
for p in self.Peers + self.getRestrictedPeersList():
peerAllowedIP = p.allowed_ip.split(',')
for pip in peerAllowedIP:
ppip = pip.strip().split('/')
if len(ppip) == 2:
try:
check = ipaddress.ip_network(ppip[0])
existedAddress.add(check)
except Exception as e:
print(f"[WGDashboard] Error: {self.Name} peer {p.id} have invalid ip")
configurationAddresses = self.Address.split(',')
for ca in configurationAddresses:
ca = ca.strip()
caSplit = ca.split('/')
try:
if len(caSplit) == 2:
network = ipaddress.ip_network(ca, False)
existedAddress.add(ipaddress.ip_network(caSplit[0]))
availableAddress[ca] = network.num_addresses
for p in existedAddress:
if p.version == network.version and p.subnet_of(network):
availableAddress[ca] -= 1
except Exception as e:
print(e)
print(f"[WGDashboard] Error: Failed to parse IP address {ca} from {self.Name}")
return True, availableAddress
def getAvailableIP(self, threshold = 255):
if len(self.Address) < 0:
return False, None
existedAddress = set()
availableAddress = {}
for p in self.Peers + self.getRestrictedPeersList():
peerAllowedIP = p.allowed_ip.split(',')
for pip in peerAllowedIP:
ppip = pip.strip().split('/')
if len(ppip) == 2:
try:
check = ipaddress.ip_network(ppip[0])
existedAddress.add(check.compressed)
except Exception as e:
print(f"[WGDashboard] Error: {self.Name} peer {p.id} have invalid ip")
configurationAddresses = self.Address.split(',')
for ca in configurationAddresses:
ca = ca.strip()
caSplit = ca.split('/')
try:
if len(caSplit) == 2:
network = ipaddress.ip_network(ca, False)
existedAddress.add(ipaddress.ip_network(caSplit[0]).compressed)
if threshold == -1:
availableAddress[ca] = filter(lambda ip : ip not in existedAddress,
map(lambda iph : ipaddress.ip_network(iph).compressed, network.hosts()))
else:
availableAddress[ca] = list(islice(filter(lambda ip : ip not in existedAddress,
map(lambda iph : ipaddress.ip_network(iph).compressed, network.hosts())), threshold))
except Exception as e:
print(e)
print(f"[WGDashboard] Error: Failed to parse IP address {ca} from {self.Name}")
print("Generated IP")
return True, availableAddress
def getRealtimeTrafficUsage(self):
stats = psutil.net_io_counters(pernic=True, nowrap=True)
if self.Name in stats.keys():
stat = stats[self.Name]
recv1 = stat.bytes_recv
sent1 = stat.bytes_sent
time.sleep(1)
stats = psutil.net_io_counters(pernic=True, nowrap=True)
if self.Name in stats.keys():
stat = stats[self.Name]
recv2 = stat.bytes_recv
sent2 = stat.bytes_sent
net_in = round((recv2 - recv1) / 1024 / 1024, 3)
net_out = round((sent2 - sent1) / 1024 / 1024, 3)
return {
"sent": net_out,
"recv": net_in
}
else:
return { "sent": 0, "recv": 0 }
else:
return { "sent": 0, "recv": 0 }
"""
AmneziaWG Configuration
"""
class AmneziaWireguardConfiguration(WireguardConfiguration):
def __init__(self, name: str = None, data: dict = None, backup: dict = None, startup: bool = False):
self.Jc = 0
self.Jmin = 0
self.Jmax = 0
self.S1 = 0
self.S2 = 0
self.H1 = 1
self.H2 = 2
self.H3 = 3
self.H4 = 4
super().__init__(name, data, backup, startup, wg=False)
def toJson(self):
self.Status = self.getStatus()
return {
"Status": self.Status,
"Name": self.Name,
"PrivateKey": self.PrivateKey,
"PublicKey": self.PublicKey,
"Address": self.Address,
"ListenPort": self.ListenPort,
"PreUp": self.PreUp,
"PreDown": self.PreDown,
"PostUp": self.PostUp,
"PostDown": self.PostDown,
"SaveConfig": self.SaveConfig,
"DataUsage": {
"Total": sum(list(map(lambda x: x.cumu_data + x.total_data, self.Peers))),
"Sent": sum(list(map(lambda x: x.cumu_sent + x.total_sent, self.Peers))),
"Receive": sum(list(map(lambda x: x.cumu_receive + x.total_receive, self.Peers)))
},
"ConnectedPeers": len(list(filter(lambda x: x.status == "running", self.Peers))),
"TotalPeers": len(self.Peers),
"Protocol": self.Protocol,
"Jc": self.Jc,
"Jmin": self.Jmin,
"Jmax": self.Jmax,
"S1": self.S1,
"S2": self.S2,
"H1": self.H1,
"H2": self.H2,
"H3": self.H3,
"H4": self.H4
}
def createDatabase(self, dbName = None):
if dbName is None:
dbName = self.Name
existingTables = sqlSelect("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
existingTables = [t['name'] for t in existingTables]
if dbName not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s'(
id VARCHAR NOT NULL, private_key VARCHAR NULL, DNS VARCHAR NULL, advanced_security VARCHAR NULL,
endpoint_allowed_ip VARCHAR NULL, name VARCHAR NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL, endpoint VARCHAR NULL,
status VARCHAR NULL, latest_handshake VARCHAR NULL, allowed_ip VARCHAR NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, mtu INT NULL,
keepalive INT NULL, remote_endpoint VARCHAR NULL, preshared_key VARCHAR NULL,
PRIMARY KEY (id)
)
""" % dbName
)
if f'{dbName}_restrict_access' not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s_restrict_access' (
id VARCHAR NOT NULL, private_key VARCHAR NULL, DNS VARCHAR NULL, advanced_security VARCHAR NULL,
endpoint_allowed_ip VARCHAR NULL, name VARCHAR NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL, endpoint VARCHAR NULL,
status VARCHAR NULL, latest_handshake VARCHAR NULL, allowed_ip VARCHAR NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, mtu INT NULL,
keepalive INT NULL, remote_endpoint VARCHAR NULL, preshared_key VARCHAR NULL,
PRIMARY KEY (id)
)
""" % dbName
)
if f'{dbName}_transfer' not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s_transfer' (
id VARCHAR NOT NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, time DATETIME
)
""" % dbName
)
if f'{dbName}_deleted' not in existingTables:
sqlUpdate(
"""
CREATE TABLE '%s_deleted' (
id VARCHAR NOT NULL, private_key VARCHAR NULL, DNS VARCHAR NULL, advanced_security VARCHAR NULL,
endpoint_allowed_ip VARCHAR NULL, name VARCHAR NULL, total_receive FLOAT NULL,
total_sent FLOAT NULL, total_data FLOAT NULL, endpoint VARCHAR NULL,
status VARCHAR NULL, latest_handshake VARCHAR NULL, allowed_ip VARCHAR NULL,
cumu_receive FLOAT NULL, cumu_sent FLOAT NULL, cumu_data FLOAT NULL, mtu INT NULL,
keepalive INT NULL, remote_endpoint VARCHAR NULL, preshared_key VARCHAR NULL,
PRIMARY KEY (id)
)
""" % dbName
)
def getPeers(self):
if self.configurationFileChanged():
self.Peers = []
with open(self.configPath, 'r') as configFile:
p = []
pCounter = -1
content = configFile.read().split('\n')
try:
peerStarts = content.index("[Peer]")
content = content[peerStarts:]
for i in content:
if not RegexMatch("#(.*)", i) and not RegexMatch(";(.*)", i):
if i == "[Peer]":
pCounter += 1
p.append({})
p[pCounter]["name"] = ""
else:
if len(i) > 0:
split = re.split(r'\s*=\s*', i, 1)
if len(split) == 2:
p[pCounter][split[0]] = split[1]
if RegexMatch("#Name# = (.*)", i):
split = re.split(r'\s*=\s*', i, 1)
if len(split) == 2:
p[pCounter]["name"] = split[1]
for i in p:
if "PublicKey" in i.keys():
checkIfExist = sqlSelect("SELECT * FROM '%s' WHERE id = ?" % self.Name,
((i['PublicKey']),)).fetchone()
if checkIfExist is None:
newPeer = {
"id": i['PublicKey'],
"advanced_security": i.get('AdvancedSecurity', 'off'),
"private_key": "",
"DNS": DashboardConfig.GetConfig("Peers", "peer_global_DNS")[1],
"endpoint_allowed_ip": DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[
1],
"name": i.get("name"),
"total_receive": 0,
"total_sent": 0,
"total_data": 0,
"endpoint": "N/A",
"status": "stopped",
"latest_handshake": "N/A",
"allowed_ip": i.get("AllowedIPs", "N/A"),
"cumu_receive": 0,
"cumu_sent": 0,
"cumu_data": 0,
"traffic": [],
"mtu": DashboardConfig.GetConfig("Peers", "peer_mtu")[1],
"keepalive": DashboardConfig.GetConfig("Peers", "peer_keep_alive")[1],
"remote_endpoint": DashboardConfig.GetConfig("Peers", "remote_endpoint")[1],
"preshared_key": i["PresharedKey"] if "PresharedKey" in i.keys() else ""
}
sqlUpdate(
"""
INSERT INTO '%s'
VALUES (:id, :private_key, :DNS, :advanced_security, :endpoint_allowed_ip, :name, :total_receive, :total_sent,
:total_data, :endpoint, :status, :latest_handshake, :allowed_ip, :cumu_receive, :cumu_sent,
:cumu_data, :mtu, :keepalive, :remote_endpoint, :preshared_key);
""" % self.Name
, newPeer)
self.Peers.append(AmneziaWGPeer(newPeer, self))
else:
sqlUpdate("UPDATE '%s' SET allowed_ip = ? WHERE id = ?" % self.Name,
(i.get("AllowedIPs", "N/A"), i['PublicKey'],))
self.Peers.append(AmneziaWGPeer(checkIfExist, self))
except Exception as e:
if __name__ == '__main__':
print(f"[WGDashboard] {self.Name} Error: {str(e)}")
else:
self.Peers.clear()
checkIfExist = sqlSelect("SELECT * FROM '%s'" % self.Name).fetchall()
for i in checkIfExist:
self.Peers.append(AmneziaWGPeer(i, self))
def addPeers(self, peers: list) -> tuple[bool, dict]:
result = {
"message": None,
"peers": []
}
try:
for i in peers:
newPeer = {
"id": i['id'],
"private_key": i['private_key'],
"DNS": i['DNS'],
"endpoint_allowed_ip": i['endpoint_allowed_ip'],
"name": i['name'],
"total_receive": 0,
"total_sent": 0,
"total_data": 0,
"endpoint": "N/A",
"status": "stopped",
"latest_handshake": "N/A",
"allowed_ip": i.get("allowed_ip", "N/A"),
"cumu_receive": 0,
"cumu_sent": 0,
"cumu_data": 0,
"traffic": [],
"mtu": i['mtu'],
"keepalive": i['keepalive'],
"remote_endpoint": DashboardConfig.GetConfig("Peers", "remote_endpoint")[1],
"preshared_key": i["preshared_key"],
"advanced_security": i['advanced_security']
}
sqlUpdate(
"""
INSERT INTO '%s'
VALUES (:id, :private_key, :DNS, :advanced_security, :endpoint_allowed_ip, :name, :total_receive, :total_sent,
:total_data, :endpoint, :status, :latest_handshake, :allowed_ip, :cumu_receive, :cumu_sent,
:cumu_data, :mtu, :keepalive, :remote_endpoint, :preshared_key);
""" % self.Name
, newPeer)
for p in peers:
presharedKeyExist = len(p['preshared_key']) > 0
rd = random.Random()
uid = str(uuid.UUID(int=rd.getrandbits(128), version=4))
if presharedKeyExist:
with open(uid, "w+") as f:
f.write(p['preshared_key'])
subprocess.check_output(
f"{self.Protocol} set {self.Name} peer {p['id']} allowed-ips {p['allowed_ip'].replace(' ', '')}{f' preshared-key {uid}' if presharedKeyExist else ''} advanced-security {p['advanced_security']}",
shell=True, stderr=subprocess.STDOUT)
if presharedKeyExist:
os.remove(uid)
subprocess.check_output(
f"{self.Protocol}-quick save {self.Name}", shell=True, stderr=subprocess.STDOUT)
self.getPeersList()
for p in peers:
p = self.searchPeer(p['id'])
if p[0]:
result['peers'].append(p[1])
return True, result
except Exception as e:
result['message'] = str(e)
return False, result
def getRestrictedPeers(self):
self.RestrictedPeers = []
restricted = sqlSelect("SELECT * FROM '%s_restrict_access'" % self.Name).fetchall()
for i in restricted:
self.RestrictedPeers.append(AmneziaWGPeer(i, self))
"""
Peer
"""
class Peer:
def __init__(self, tableData, configuration: WireguardConfiguration):
self.configuration = configuration
self.id = tableData["id"]
self.private_key = tableData["private_key"]
self.DNS = tableData["DNS"]
self.endpoint_allowed_ip = tableData["endpoint_allowed_ip"]
self.name = tableData["name"]
self.total_receive = tableData["total_receive"]
self.total_sent = tableData["total_sent"]
self.total_data = tableData["total_data"]
self.endpoint = tableData["endpoint"]
self.status = tableData["status"]
self.latest_handshake = tableData["latest_handshake"]
self.allowed_ip = tableData["allowed_ip"]
self.cumu_receive = tableData["cumu_receive"]
self.cumu_sent = tableData["cumu_sent"]
self.cumu_data = tableData["cumu_data"]
self.mtu = tableData["mtu"]
self.keepalive = tableData["keepalive"]
self.remote_endpoint = tableData["remote_endpoint"]
self.preshared_key = tableData["preshared_key"]
self.jobs: list[PeerJob] = []
self.ShareLink: list[PeerShareLink] = []
self.getJobs()
self.getShareLink()
def toJson(self):
self.getJobs()
self.getShareLink()
return self.__dict__
def __repr__(self):
return str(self.toJson())
def updatePeer(self, name: str, private_key: str,
preshared_key: str,
dns_addresses: str, allowed_ip: str, endpoint_allowed_ip: str, mtu: int,
keepalive: int) -> ResponseObject:
if not self.configuration.getStatus():
self.configuration.toggleConfiguration()
existingAllowedIps = [item for row in list(
map(lambda x: [q.strip() for q in x.split(',')],
map(lambda y: y.allowed_ip,
list(filter(lambda k: k.id != self.id, self.configuration.getPeersList()))))) for item in row]
if allowed_ip in existingAllowedIps:
return ResponseObject(False, "Allowed IP already taken by another peer")
if not ValidateIPAddressesWithRange(endpoint_allowed_ip):
return ResponseObject(False, f"Endpoint Allowed IPs format is incorrect")
if len(dns_addresses) > 0 and not ValidateDNSAddress(dns_addresses):
return ResponseObject(False, f"DNS format is incorrect")
if mtu < 0 or mtu > 1460:
return ResponseObject(False, "MTU format is not correct")
if keepalive < 0:
return ResponseObject(False, "Persistent Keepalive format is not correct")
if len(private_key) > 0:
pubKey = GenerateWireguardPublicKey(private_key)
if not pubKey[0] or pubKey[1] != self.id:
return ResponseObject(False, "Private key does not match with the public key")
try:
rd = random.Random()
uid = str(uuid.UUID(int=rd.getrandbits(128), version=4))
pskExist = len(preshared_key) > 0
if pskExist:
with open(uid, "w+") as f:
f.write(preshared_key)
newAllowedIPs = allowed_ip.replace(" ", "")
updateAllowedIp = subprocess.check_output(
f"{self.configuration.Protocol} set {self.configuration.Name} peer {self.id} allowed-ips {newAllowedIPs} {f'preshared-key {uid}' if pskExist else 'preshared-key /dev/null'}",
shell=True, stderr=subprocess.STDOUT)
if pskExist: os.remove(uid)
if len(updateAllowedIp.decode().strip("\n")) != 0:
return ResponseObject(False,
"Update peer failed when updating Allowed IPs")
saveConfig = subprocess.check_output(f"{self.configuration.Protocol}-quick save {self.configuration.Name}",
shell=True, stderr=subprocess.STDOUT)
if f"wg showconf {self.configuration.Name}" not in saveConfig.decode().strip('\n'):
return ResponseObject(False,
"Update peer failed when saving the configuration")
sqlUpdate(
'''UPDATE '%s' SET name = ?, private_key = ?, DNS = ?, endpoint_allowed_ip = ?, mtu = ?,
keepalive = ?, preshared_key = ? WHERE id = ?''' % self.configuration.Name,
(name, private_key, dns_addresses, endpoint_allowed_ip, mtu,
keepalive, preshared_key, self.id,)
)
return ResponseObject()
except subprocess.CalledProcessError as exc:
return ResponseObject(False, exc.output.decode("UTF-8").strip())
def downloadPeer(self) -> dict[str, str]:
filename = self.name
if len(filename) == 0:
filename = "UntitledPeer"
filename = "".join(filename.split(' '))
filename = f"{filename}"
illegal_filename = [".", ",", "/", "?", "<", ">", "\\", ":", "*", '|' '\"', "com1", "com2", "com3",
"com4", "com5", "com6", "com7", "com8", "com9", "lpt1", "lpt2", "lpt3", "lpt4",
"lpt5", "lpt6", "lpt7", "lpt8", "lpt9", "con", "nul", "prn"]
for i in illegal_filename:
filename = filename.replace(i, "")
finalFilename = ""
for i in filename:
if re.match("^[a-zA-Z0-9_=+.-]$", i):
finalFilename += i
peerConfiguration = f'''[Interface]
PrivateKey = {self.private_key}
Address = {self.allowed_ip}
MTU = {str(self.mtu)}
'''
if len(self.DNS) > 0:
peerConfiguration += f"DNS = {self.DNS}\n"
peerConfiguration += f'''
[Peer]
PublicKey = {self.configuration.PublicKey}
AllowedIPs = {self.endpoint_allowed_ip}
Endpoint = {DashboardConfig.GetConfig("Peers", "remote_endpoint")[1]}:{self.configuration.ListenPort}
PersistentKeepalive = {str(self.keepalive)}
'''
if len(self.preshared_key) > 0:
peerConfiguration += f"PresharedKey = {self.preshared_key}\n"
return {
"fileName": finalFilename,
"file": peerConfiguration
}
def getJobs(self):
self.jobs = AllPeerJobs.searchJob(self.configuration.Name, self.id)
def getShareLink(self):
self.ShareLink = AllPeerShareLinks.getLink(self.configuration.Name, self.id)
def resetDataUsage(self, type):
try:
if type == "total":
sqlUpdate("UPDATE '%s' SET total_data = 0, cumu_data = 0, total_receive = 0, cumu_receive = 0, total_sent = 0, cumu_sent = 0 WHERE id = ?" % self.configuration.Name, (self.id, ))
self.total_data = 0
self.total_receive = 0
self.total_sent = 0
self.cumu_data = 0
self.cumu_sent = 0
self.cumu_receive = 0
elif type == "receive":
sqlUpdate("UPDATE '%s' SET total_receive = 0, cumu_receive = 0 WHERE id = ?" % self.configuration.Name, (self.id, ))
self.cumu_receive = 0
self.total_receive = 0
elif type == "sent":
sqlUpdate("UPDATE '%s' SET total_sent = 0, cumu_sent = 0 WHERE id = ?" % self.configuration.Name, (self.id, ))
self.cumu_sent = 0
self.total_sent = 0
else:
return False
except Exception as e:
print(e)
return False
return True
class AmneziaWGPeer(Peer):
def __init__(self, tableData, configuration: AmneziaWireguardConfiguration):
self.advanced_security = tableData["advanced_security"]
super().__init__(tableData, configuration)
def downloadPeer(self) -> dict[str, str]:
filename = self.name
if len(filename) == 0:
filename = "UntitledPeer"
filename = "".join(filename.split(' '))
filename = f"{filename}_{self.configuration.Name}"
illegal_filename = [".", ",", "/", "?", "<", ">", "\\", ":", "*", '|' '\"', "com1", "com2", "com3",
"com4", "com5", "com6", "com7", "com8", "com9", "lpt1", "lpt2", "lpt3", "lpt4",
"lpt5", "lpt6", "lpt7", "lpt8", "lpt9", "con", "nul", "prn"]
for i in illegal_filename:
filename = filename.replace(i, "")
finalFilename = ""
for i in filename:
if re.match("^[a-zA-Z0-9_=+.-]$", i):
finalFilename += i
peerConfiguration = f'''[Interface]
PrivateKey = {self.private_key}
Address = {self.allowed_ip}
MTU = {str(self.mtu)}
Jc = {self.configuration.Jc}
Jmin = {self.configuration.Jmin}
Jmax = {self.configuration.Jmax}
S1 = {self.configuration.S1}
S2 = {self.configuration.S2}
H1 = {self.configuration.H1}
H2 = {self.configuration.H2}
H3 = {self.configuration.H3}
H4 = {self.configuration.H4}
'''
if len(self.DNS) > 0:
peerConfiguration += f"DNS = {self.DNS}\n"
peerConfiguration += f'''
[Peer]
PublicKey = {self.configuration.PublicKey}
AllowedIPs = {self.endpoint_allowed_ip}
Endpoint = {DashboardConfig.GetConfig("Peers", "remote_endpoint")[1]}:{self.configuration.ListenPort}
PersistentKeepalive = {str(self.keepalive)}
'''
if len(self.preshared_key) > 0:
peerConfiguration += f"PresharedKey = {self.preshared_key}\n"
return {
"fileName": finalFilename,
"file": peerConfiguration
}
def updatePeer(self, name: str, private_key: str,
preshared_key: str,
dns_addresses: str, allowed_ip: str, endpoint_allowed_ip: str, mtu: int,
keepalive: int, advanced_security: str) -> ResponseObject:
if not self.configuration.getStatus():
self.configuration.toggleConfiguration()
existingAllowedIps = [item for row in list(
map(lambda x: [q.strip() for q in x.split(',')],
map(lambda y: y.allowed_ip,
list(filter(lambda k: k.id != self.id, self.configuration.getPeersList()))))) for item in row]
if allowed_ip in existingAllowedIps:
return ResponseObject(False, "Allowed IP already taken by another peer")
if not ValidateIPAddressesWithRange(endpoint_allowed_ip):
return ResponseObject(False, f"Endpoint Allowed IPs format is incorrect")
if len(dns_addresses) > 0 and not ValidateDNSAddress(dns_addresses):
return ResponseObject(False, f"DNS format is incorrect")
if mtu < 0 or mtu > 1460:
return ResponseObject(False, "MTU format is not correct")
if keepalive < 0:
return ResponseObject(False, "Persistent Keepalive format is not correct")
if advanced_security != "on" and advanced_security != "off":
return ResponseObject(False, "Advanced Security can only be on or off")
if len(private_key) > 0:
pubKey = GenerateWireguardPublicKey(private_key)
if not pubKey[0] or pubKey[1] != self.id:
return ResponseObject(False, "Private key does not match with the public key")
try:
rd = random.Random()
uid = str(uuid.UUID(int=rd.getrandbits(128), version=4))
pskExist = len(preshared_key) > 0
if pskExist:
with open(uid, "w+") as f:
f.write(preshared_key)
newAllowedIPs = allowed_ip.replace(" ", "")
updateAllowedIp = subprocess.check_output(
f"{self.configuration.Protocol} set {self.configuration.Name} peer {self.id} allowed-ips {newAllowedIPs} {f'preshared-key {uid}' if pskExist else 'preshared-key /dev/null'} advanced-security {advanced_security}",
shell=True, stderr=subprocess.STDOUT)
if pskExist: os.remove(uid)
if len(updateAllowedIp.decode().strip("\n")) != 0:
return ResponseObject(False,
"Update peer failed when updating Allowed IPs")
saveConfig = subprocess.check_output(f"{self.configuration.Protocol}-quick save {self.configuration.Name}",
shell=True, stderr=subprocess.STDOUT)
if f"wg showconf {self.configuration.Name}" not in saveConfig.decode().strip('\n'):
return ResponseObject(False,
"Update peer failed when saving the configuration")
sqlUpdate(
'''UPDATE '%s' SET name = ?, private_key = ?, DNS = ?, endpoint_allowed_ip = ?, mtu = ?,
keepalive = ?, preshared_key = ?, advanced_security = ? WHERE id = ?''' % self.configuration.Name,
(name, private_key, dns_addresses, endpoint_allowed_ip, mtu,
keepalive, preshared_key, advanced_security, self.id,)
)
return ResponseObject()
except subprocess.CalledProcessError as exc:
return ResponseObject(False, exc.output.decode("UTF-8").strip())
"""
Dashboard API Key
"""
class DashboardAPIKey:
def __init__(self, Key: str, CreatedAt: str, ExpiredAt: str):
self.Key = Key
self.CreatedAt = CreatedAt
self.ExpiredAt = ExpiredAt
def toJson(self):
return self.__dict__
"""
Dashboard Configuration
"""
class DashboardConfig:
def __init__(self):
if not os.path.exists(DASHBOARD_CONF):
open(DASHBOARD_CONF, "x")
self.__config = configparser.ConfigParser(strict=False)
self.__config.read_file(open(DASHBOARD_CONF, "r+"))
self.hiddenAttribute = ["totp_key", "auth_req"]
self.__default = {
"Account": {
"username": "admin",
"password": "admin",
"enable_totp": "false",
"totp_verified": "false",
"totp_key": pyotp.random_base32()
},
"Server": {
"wg_conf_path": "/etc/wireguard",
"awg_conf_path": "/etc/amnezia/amneziawg",
"app_prefix": "",
"app_ip": "0.0.0.0",
"app_port": "10086",
"auth_req": "true",
"version": DASHBOARD_VERSION,
"dashboard_refresh_interval": "60000",
"dashboard_peer_list_display": "grid",
"dashboard_sort": "status",
"dashboard_theme": "dark",
"dashboard_api_key": "false",
"dashboard_language": "en"
},
"Peers": {
"peer_global_DNS": "1.1.1.1",
"peer_endpoint_allowed_ip": "0.0.0.0/0",
"peer_display_mode": "grid",
"remote_endpoint": GetRemoteEndpoint(),
"peer_MTU": "1420",
"peer_keep_alive": "21"
},
"Other": {
"welcome_session": "true"
},
"Database":{
"type": "sqlite"
},
"Email":{
"server": "",
"port": "",
"encryption": "",
"username": "",
"email_password": "",
"send_from": "",
"email_template": ""
},
"WireGuardConfiguration": {
"autostart": ""
}
}
for section, keys in self.__default.items():
for key, value in keys.items():
exist, currentData = self.GetConfig(section, key)
if not exist:
self.SetConfig(section, key, value, True)
self.__createAPIKeyTable()
self.DashboardAPIKeys = self.__getAPIKeys()
self.APIAccessed = False
self.SetConfig("Server", "version", DASHBOARD_VERSION)
def __createAPIKeyTable(self):
existingTable = sqlSelect("SELECT name FROM sqlite_master WHERE type='table' AND name = 'DashboardAPIKeys'").fetchall()
if len(existingTable) == 0:
sqlUpdate("CREATE TABLE DashboardAPIKeys (Key VARCHAR NOT NULL PRIMARY KEY, CreatedAt DATETIME NOT NULL DEFAULT (datetime('now', 'localtime')), ExpiredAt VARCHAR)")
def __getAPIKeys(self) -> list[DashboardAPIKey]:
keys = sqlSelect("SELECT * FROM DashboardAPIKeys WHERE ExpiredAt IS NULL OR ExpiredAt > datetime('now', 'localtime') ORDER BY CreatedAt DESC").fetchall()
fKeys = []
for k in keys:
fKeys.append(DashboardAPIKey(*k))
return fKeys
def createAPIKeys(self, ExpiredAt = None):
newKey = secrets.token_urlsafe(32)
sqlUpdate('INSERT INTO DashboardAPIKeys (Key, ExpiredAt) VALUES (?, ?)', (newKey, ExpiredAt,))
self.DashboardAPIKeys = self.__getAPIKeys()
def deleteAPIKey(self, key):
sqlUpdate("UPDATE DashboardAPIKeys SET ExpiredAt = datetime('now', 'localtime') WHERE Key = ?", (key, ))
self.DashboardAPIKeys = self.__getAPIKeys()
def __configValidation(self, section : str, key: str, value: Any) -> [bool, str]:
if (type(value) is str and len(value) == 0
and section not in ['Email', 'WireGuardConfiguration'] and
(section == 'Peer' and key == 'peer_global_dns')):
return False, "Field cannot be empty!"
if section == "Peers" and key == "peer_global_dns" and len(value) > 0:
return ValidateDNSAddress(value)
if section == "Peers" and key == "peer_endpoint_allowed_ip":
value = value.split(",")
for i in value:
i = i.strip()
try:
ipaddress.ip_network(i, strict=False)
except Exception as e:
return False, str(e)
if section == "Server" and key == "wg_conf_path":
if not os.path.exists(value):
return False, f"{value} is not a valid path"
if section == "Account" and key == "password":
if self.GetConfig("Account", "password")[0]:
if not self.__checkPassword(
value["currentPassword"], self.GetConfig("Account", "password")[1].encode("utf-8")):
return False, "Current password does not match."
if value["newPassword"] != value["repeatNewPassword"]:
return False, "New passwords does not match"
return True, ""
def generatePassword(self, plainTextPassword: str):
return bcrypt.hashpw(plainTextPassword.encode("utf-8"), bcrypt.gensalt())
def __checkPassword(self, plainTextPassword: str, hashedPassword: bytes):
return bcrypt.checkpw(plainTextPassword.encode("utf-8"), hashedPassword)
def SetConfig(self, section: str, key: str, value: any, init: bool = False) -> [bool, str]:
if key in self.hiddenAttribute and not init:
return False, None
if not init:
valid, msg = self.__configValidation(section, key, value)
if not valid:
return False, msg
if section == "Account" and key == "password":
if not init:
value = self.generatePassword(value["newPassword"]).decode("utf-8")
else:
value = self.generatePassword(value).decode("utf-8")
if section == "Email" and key == "email_template":
value = value.encode('unicode_escape').decode('utf-8')
if section == "Server" and key == "wg_conf_path":
if not os.path.exists(value):
return False, "Path does not exist"
if section not in self.__config:
if init:
self.__config[section] = {}
else:
return False, "Section does not exist"
if ((key not in self.__config[section].keys() and init) or
(key in self.__config[section].keys())):
if type(value) is bool:
if value:
self.__config[section][key] = "true"
else:
self.__config[section][key] = "false"
elif type(value) in [int, float]:
self.__config[section][key] = str(value)
elif type(value) is list:
self.__config[section][key] = "||".join(value).strip("||")
else:
self.__config[section][key] = value
return self.SaveConfig(), ""
else:
return False, f"{key} does not exist under {section}"
return True, ""
def SaveConfig(self) -> bool:
try:
with open(DASHBOARD_CONF, "w+", encoding='utf-8') as configFile:
self.__config.write(configFile)
return True
except Exception as e:
return False
def GetConfig(self, section, key) -> [bool, any]:
if section not in self.__config:
return False, None
if key not in self.__config[section]:
return False, None
if section == "Email" and key == "email_template":
return True, self.__config[section][key].encode('utf-8').decode('unicode_escape')
if section == "WireGuardConfiguration" and key == "autostart":
return True, list(filter(lambda x: len(x) > 0, self.__config[section][key].split("||")))
if self.__config[section][key] in ["1", "yes", "true", "on"]:
return True, True
if self.__config[section][key] in ["0", "no", "false", "off"]:
return True, False
return True, self.__config[section][key]
def toJson(self) -> dict[str, dict[Any, Any]]:
the_dict = {}
for section in self.__config.sections():
the_dict[section] = {}
for key, val in self.__config.items(section):
if key not in self.hiddenAttribute:
the_dict[section][key] = self.GetConfig(section, key)[1]
return the_dict
"""
Database Connection Functions
"""
sqldb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard.db'), check_same_thread=False)
sqldb.row_factory = sqlite3.Row
def sqlSelect(statement: str, paramters: tuple = ()) -> sqlite3.Cursor:
result = []
try:
cursor = sqldb.cursor()
result = cursor.execute(statement, paramters)
except Exception as error:
print("[WGDashboard] SQLite Error:" + str(error) + " | Statement: " + statement)
return result
def sqlUpdate(statement: str, paramters: tuple = ()) -> sqlite3.Cursor:
sqldb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard.db'))
sqldb.row_factory = sqlite3.Row
cursor = sqldb.cursor()
with sqldb:
cursor = sqldb.cursor()
try:
statement = statement.rstrip(';')
s = f'BEGIN TRANSACTION;{statement};END TRANSACTION;'
cursor.execute(statement, paramters)
# sqldb.commit()
except Exception as error:
print("[WGDashboard] SQLite Error:" + str(error) + " | Statement: " + statement)
sqldb.close()
DashboardConfig = DashboardConfig()
EmailSender = EmailSender(DashboardConfig)
_, APP_PREFIX = DashboardConfig.GetConfig("Server", "app_prefix")
cors = CORS(app, resources={rf"{APP_PREFIX}/api/*": {
"origins": "*",
"methods": "DELETE, POST, GET, OPTIONS",
"allow_headers": ["Content-Type", "wg-dashboard-apikey"]
}})
'''
API Routes
'''
@app.before_request
def auth_req():
if request.method.lower() == 'options':
return ResponseObject(True)
DashboardConfig.APIAccessed = False
if "api" in request.path:
if str(request.method) == "GET":
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=str(request.args))
elif str(request.method) == "POST":
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Request Args: {str(request.args)} Body:{str(request.get_json())}")
authenticationRequired = DashboardConfig.GetConfig("Server", "auth_req")[1]
d = request.headers
if authenticationRequired:
apiKey = d.get('wg-dashboard-apikey')
apiKeyEnabled = DashboardConfig.GetConfig("Server", "dashboard_api_key")[1]
if apiKey is not None and len(apiKey) > 0 and apiKeyEnabled:
apiKeyExist = len(list(filter(lambda x : x.Key == apiKey, DashboardConfig.DashboardAPIKeys))) == 1
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"API Key Access: {('true' if apiKeyExist else 'false')} - Key: {apiKey}")
if not apiKeyExist:
DashboardConfig.APIAccessed = False
response = Flask.make_response(app, {
"status": False,
"message": "API Key does not exist",
"data": None
})
response.content_type = "application/json"
response.status_code = 401
return response
DashboardConfig.APIAccessed = True
else:
DashboardConfig.APIAccessed = False
whiteList = [
'/static/', 'validateAuthentication', 'authenticate', 'getDashboardConfiguration',
'getDashboardTheme', 'getDashboardVersion', 'sharePeer/get', 'isTotpEnabled', 'locale',
'/fileDownload'
]
if ("username" not in session
and (f"{(APP_PREFIX if len(APP_PREFIX) > 0 else '')}/" != request.path
and f"{(APP_PREFIX if len(APP_PREFIX) > 0 else '')}" != request.path)
and len(list(filter(lambda x : x not in request.path, whiteList))) == len(whiteList)
):
response = Flask.make_response(app, {
"status": False,
"message": "Unauthorized access.",
"data": None
})
response.content_type = "application/json"
response.status_code = 401
return response
@app.route(f'{APP_PREFIX}/api/handshake', methods=["GET", "OPTIONS"])
def API_Handshake():
return ResponseObject(True)
@app.get(f'{APP_PREFIX}/api/validateAuthentication')
def API_ValidateAuthentication():
token = request.cookies.get("authToken")
if DashboardConfig.GetConfig("Server", "auth_req")[1]:
if token is None or token == "" or "username" not in session or session["username"] != token:
return ResponseObject(False, "Invalid authentication.")
return ResponseObject(True)
@app.get(f'{APP_PREFIX}/api/requireAuthentication')
def API_RequireAuthentication():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "auth_req")[1])
@app.post(f'{APP_PREFIX}/api/authenticate')
def API_AuthenticateLogin():
data = request.get_json()
if not DashboardConfig.GetConfig("Server", "auth_req")[1]:
return ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
if DashboardConfig.APIAccessed:
authToken = hashlib.sha256(f"{request.headers.get('wg-dashboard-apikey')}{datetime.now()}".encode()).hexdigest()
session['username'] = authToken
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", authToken)
session.permanent = True
return resp
valid = bcrypt.checkpw(data['password'].encode("utf-8"),
DashboardConfig.GetConfig("Account", "password")[1].encode("utf-8"))
totpEnabled = DashboardConfig.GetConfig("Account", "enable_totp")[1]
totpValid = False
if totpEnabled:
totpValid = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now() == data['totp']
if (valid
and data['username'] == DashboardConfig.GetConfig("Account", "username")[1]
and ((totpEnabled and totpValid) or not totpEnabled)
):
authToken = hashlib.sha256(f"{data['username']}{datetime.now()}".encode()).hexdigest()
session['username'] = authToken
resp = ResponseObject(True, DashboardConfig.GetConfig("Other", "welcome_session")[1])
resp.set_cookie("authToken", authToken)
session.permanent = True
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login success: {data['username']}")
return resp
DashboardLogger.log(str(request.url), str(request.remote_addr), Message=f"Login failed: {data['username']}")
if totpEnabled:
return ResponseObject(False, "Sorry, your username, password or OTP is incorrect.")
else:
return ResponseObject(False, "Sorry, your username or password is incorrect.")
@app.get(f'{APP_PREFIX}/api/signout')
def API_SignOut():
resp = ResponseObject(True, "")
resp.delete_cookie("authToken")
session.clear()
return resp
@app.route(f'{APP_PREFIX}/api/getWireguardConfigurations', methods=["GET"])
def API_getWireguardConfigurations():
InitWireguardConfigurationsList()
return ResponseObject(data=[wc for wc in WireguardConfigurations.values()])
@app.route(f'{APP_PREFIX}/api/addWireguardConfiguration', methods=["POST"])
def API_addWireguardConfiguration():
data = request.get_json()
requiredKeys = [
"ConfigurationName", "Address", "ListenPort", "PrivateKey", "Protocol"
]
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide all required parameters.")
if data.get("Protocol") not in ProtocolsEnabled():
return ResponseObject(False, "Please provide a valid protocol: wg / awg.")
# Check duplicate names, ports, address
for i in WireguardConfigurations.values():
if i.Name == data['ConfigurationName']:
return ResponseObject(False,
f"Already have a configuration with the name \"{data['ConfigurationName']}\"",
"ConfigurationName")
if str(i.ListenPort) == str(data["ListenPort"]):
return ResponseObject(False,
f"Already have a configuration with the port \"{data['ListenPort']}\"",
"ListenPort")
if i.Address == data["Address"]:
return ResponseObject(False,
f"Already have a configuration with the address \"{data['Address']}\"",
"Address")
if "Backup" in data.keys():
path = {
"wg": DashboardConfig.GetConfig("Server", "wg_conf_path")[1],
"awg": DashboardConfig.GetConfig("Server", "awg_conf_path")[1]
}
if (os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"])) and
os.path.exists(os.path.join(path['wg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
protocol = "wg"
elif (os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"])) and
os.path.exists(os.path.join(path['awg'], 'WGDashboard_Backup', data["Backup"].replace('.conf', '.sql')))):
protocol = "awg"
else:
return ResponseObject(False, "Backup does not exist")
shutil.copy(
os.path.join(path[protocol], 'WGDashboard_Backup', data["Backup"]),
os.path.join(path[protocol], f'{data["ConfigurationName"]}.conf')
)
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data, name=data['ConfigurationName']) if protocol == 'wg' else AmneziaWireguardConfiguration(data=data, name=data['ConfigurationName'])
else:
WireguardConfigurations[data['ConfigurationName']] = WireguardConfiguration(data=data) if data.get('Protocol') == 'wg' else AmneziaWireguardConfiguration(data=data)
return ResponseObject()
@app.get(f'{APP_PREFIX}/api/toggleWireguardConfiguration')
def API_toggleWireguardConfiguration():
configurationName = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
toggleStatus, msg = WireguardConfigurations[configurationName].toggleConfiguration()
return ResponseObject(toggleStatus, msg, WireguardConfigurations[configurationName].Status)
@app.post(f'{APP_PREFIX}/api/updateWireguardConfiguration')
def API_updateWireguardConfiguration():
data = request.get_json()
requiredKeys = ["Name"]
for i in requiredKeys:
if i not in data.keys():
return ResponseObject(False, "Please provide these following field: " + ", ".join(requiredKeys))
name = data.get("Name")
if name not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, msg = WireguardConfigurations[name].updateConfigurationSettings(data)
return ResponseObject(status, message=msg, data=WireguardConfigurations[name])
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRawFile')
def API_GetWireguardConfigurationRawFile():
configurationName = request.args.get('configurationName')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name", status_code=404)
return ResponseObject(data={
"path": WireguardConfigurations[configurationName].configPath,
"content": WireguardConfigurations[configurationName].getRawConfigurationFile()
})
@app.post(f'{APP_PREFIX}/api/updateWireguardConfigurationRawFile')
def API_UpdateWireguardConfigurationRawFile():
data = request.get_json()
configurationName = data.get('configurationName')
rawConfiguration = data.get('rawConfiguration')
if configurationName is None or len(
configurationName) == 0 or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide a valid configuration name")
if rawConfiguration is None or len(rawConfiguration) == 0:
return ResponseObject(False, "Please provide content")
status, err = WireguardConfigurations[configurationName].updateRawConfigurationFile(rawConfiguration)
return ResponseObject(status=status, message=err)
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfiguration')
def API_deleteWireguardConfiguration():
data = request.get_json()
if "ConfigurationName" not in data.keys() or data.get("ConfigurationName") is None or data.get("ConfigurationName") not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide the configuration name you want to delete", status_code=404)
status = WireguardConfigurations[data.get("ConfigurationName")].deleteConfiguration()
if status:
WireguardConfigurations.pop(data.get("ConfigurationName"))
return ResponseObject(status)
@app.post(f'{APP_PREFIX}/api/renameWireguardConfiguration')
def API_renameWireguardConfiguration():
data = request.get_json()
keys = ["ConfigurationName", "NewConfigurationName"]
for k in keys:
if (k not in data.keys() or data.get(k) is None or len(data.get(k)) == 0 or
(k == "ConfigurationName" and data.get(k) not in WireguardConfigurations.keys())):
return ResponseObject(False, "Please provide the configuration name you want to rename", status_code=404)
status, message = WireguardConfigurations[data.get("ConfigurationName")].renameConfiguration(data.get("NewConfigurationName"))
if status:
WireguardConfigurations.pop(data.get("ConfigurationName"))
WireguardConfigurations[data.get("NewConfigurationName")] = WireguardConfiguration(data.get("NewConfigurationName"))
return ResponseObject(status, message)
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationRealtimeTraffic')
def API_getWireguardConfigurationRealtimeTraffic():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(data=WireguardConfigurations[configurationName].getRealtimeTrafficUsage())
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationBackup')
def API_getWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(data=WireguardConfigurations[configurationName].getBackups())
@app.get(f'{APP_PREFIX}/api/getAllWireguardConfigurationBackup')
def API_getAllWireguardConfigurationBackup():
data = {
"ExistingConfigurations": {},
"NonExistingConfigurations": {}
}
existingConfiguration = WireguardConfigurations.keys()
for i in existingConfiguration:
b = WireguardConfigurations[i].getBackups(True)
if len(b) > 0:
data['ExistingConfigurations'][i] = WireguardConfigurations[i].getBackups(True)
for protocol in ProtocolsEnabled():
directory = os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup')
files = [(file, os.path.getctime(os.path.join(directory, file)))
for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]
files.sort(key=lambda x: x[1], reverse=True)
for f, ct in files:
if RegexMatch(r"^(.*)_(.*)\.(conf)$", f):
s = re.search(r"^(.*)_(.*)\.(conf)$", f)
name = s.group(1)
if name not in existingConfiguration:
if name not in data['NonExistingConfigurations'].keys():
data['NonExistingConfigurations'][name] = []
date = s.group(2)
d = {
"protocol": protocol,
"filename": f,
"backupDate": date,
"content": open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f), 'r').read()
}
if f.replace(".conf", ".sql") in list(os.listdir(directory)):
d['database'] = True
d['databaseContent'] = open(os.path.join(DashboardConfig.GetConfig("Server", f"{protocol}_conf_path")[1], 'WGDashboard_Backup', f.replace(".conf", ".sql")), 'r').read()
data['NonExistingConfigurations'][name].append(d)
return ResponseObject(data=data)
@app.get(f'{APP_PREFIX}/api/createWireguardConfigurationBackup')
def API_createWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
return ResponseObject(status=WireguardConfigurations[configurationName].backupConfigurationFile()[0],
data=WireguardConfigurations[configurationName].getBackups())
@app.post(f'{APP_PREFIX}/api/deleteWireguardConfigurationBackup')
def API_deleteWireguardConfigurationBackup():
data = request.get_json()
if ("ConfigurationName" not in data.keys() or
"BackupFileName" not in data.keys() or
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
"Please provide configurationName and backupFileName in body", status_code=400)
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status = WireguardConfigurations[configurationName].deleteBackup(backupFileName)
return ResponseObject(status=status, message=(None if status else 'Backup file does not exist'),
status_code = (200 if status else 404))
@app.get(f'{APP_PREFIX}/api/downloadWireguardConfigurationBackup')
def API_downloadWireguardConfigurationBackup():
configurationName = request.args.get('configurationName')
backupFileName = request.args.get('backupFileName')
if configurationName is None or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status, zip = WireguardConfigurations[configurationName].downloadBackup(backupFileName)
return ResponseObject(status, data=zip, status_code=(200 if status else 404))
@app.post(f'{APP_PREFIX}/api/restoreWireguardConfigurationBackup')
def API_restoreWireguardConfigurationBackup():
data = request.get_json()
if ("ConfigurationName" not in data.keys() or
"BackupFileName" not in data.keys() or
len(data['ConfigurationName']) == 0 or
len(data['BackupFileName']) == 0):
return ResponseObject(False,
"Please provide ConfigurationName and BackupFileName in body", status_code=400)
configurationName = data['ConfigurationName']
backupFileName = data['BackupFileName']
if configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist", status_code=404)
status = WireguardConfigurations[configurationName].restoreBackup(backupFileName)
return ResponseObject(status=status, message=(None if status else 'Restore backup failed'))
@app.get(f'{APP_PREFIX}/api/getDashboardConfiguration')
def API_getDashboardConfiguration():
return ResponseObject(data=DashboardConfig.toJson())
@app.post(f'{APP_PREFIX}/api/updateDashboardConfigurationItem')
def API_updateDashboardConfigurationItem():
data = request.get_json()
if "section" not in data.keys() or "key" not in data.keys() or "value" not in data.keys():
return ResponseObject(False, "Invalid request.")
valid, msg = DashboardConfig.SetConfig(
data["section"], data["key"], data['value'])
if not valid:
return ResponseObject(False, msg, status_code=404)
if data['section'] == "Server":
if data['key'] == 'wg_conf_path':
WireguardConfigurations.clear()
WireguardConfigurations.clear()
InitWireguardConfigurationsList()
return ResponseObject(True, data=DashboardConfig.GetConfig(data["section"], data["key"])[1])
@app.get(f'{APP_PREFIX}/api/getDashboardAPIKeys')
def API_getDashboardAPIKeys():
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
return ResponseObject(data=DashboardConfig.DashboardAPIKeys)
return ResponseObject(False, "WGDashboard API Keys function is disabled")
@app.post(f'{APP_PREFIX}/api/newDashboardAPIKey')
def API_newDashboardAPIKey():
data = request.get_json()
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
try:
if data['NeverExpire']:
expiredAt = None
else:
expiredAt = datetime.strptime(data['ExpiredAt'], '%Y-%m-%d %H:%M:%S')
DashboardConfig.createAPIKeys(expiredAt)
return ResponseObject(True, data=DashboardConfig.DashboardAPIKeys)
except Exception as e:
return ResponseObject(False, str(e))
return ResponseObject(False, "Dashboard API Keys function is disbaled")
@app.post(f'{APP_PREFIX}/api/deleteDashboardAPIKey')
def API_deleteDashboardAPIKey():
data = request.get_json()
if DashboardConfig.GetConfig('Server', 'dashboard_api_key'):
if len(data['Key']) > 0 and len(list(filter(lambda x : x.Key == data['Key'], DashboardConfig.DashboardAPIKeys))) > 0:
DashboardConfig.deleteAPIKey(data['Key'])
return ResponseObject(True, data=DashboardConfig.DashboardAPIKeys)
else:
return ResponseObject(False, "API Key does not exist", status_code=404)
return ResponseObject(False, "Dashboard API Keys function is disbaled")
@app.post(f'{APP_PREFIX}/api/updatePeerSettings/')
def API_updatePeerSettings(configName):
data = request.get_json()
id = data['id']
if len(id) > 0 and configName in WireguardConfigurations.keys():
name = data['name']
private_key = data['private_key']
dns_addresses = data['DNS']
allowed_ip = data['allowed_ip']
endpoint_allowed_ip = data['endpoint_allowed_ip']
preshared_key = data['preshared_key']
mtu = data['mtu']
keepalive = data['keepalive']
wireguardConfig = WireguardConfigurations[configName]
foundPeer, peer = wireguardConfig.searchPeer(id)
if foundPeer:
if wireguardConfig.Protocol == 'wg':
return peer.updatePeer(name, private_key, preshared_key, dns_addresses,
allowed_ip, endpoint_allowed_ip, mtu, keepalive)
return peer.updatePeer(name, private_key, preshared_key, dns_addresses,
allowed_ip, endpoint_allowed_ip, mtu, keepalive, data.get('advanced_security', 'off'))
return ResponseObject(False, "Peer does not exist")
@app.post(f'{APP_PREFIX}/api/resetPeerData/')
def API_resetPeerData(configName):
data = request.get_json()
id = data['id']
type = data['type']
if len(id) == 0 or configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration/Peer does not exist")
wgc = WireguardConfigurations.get(configName)
foundPeer, peer = wgc.searchPeer(id)
if not foundPeer:
return ResponseObject(False, "Configuration/Peer does not exist")
resetStatus = peer.resetDataUsage(type)
if resetStatus:
wgc.restrictPeers([id])
wgc.allowAccessPeers([id])
return ResponseObject(status=resetStatus)
@app.post(f'{APP_PREFIX}/api/deletePeers/')
def API_deletePeers(configName: str) -> ResponseObject:
data = request.get_json()
peers = data['peers']
if configName in WireguardConfigurations.keys():
if len(peers) == 0:
return ResponseObject(False, "Please specify one or more peers")
configuration = WireguardConfigurations.get(configName)
return configuration.deletePeers(peers)
return ResponseObject(False, "Configuration does not exist")
@app.post(f'{APP_PREFIX}/api/restrictPeers/')
def API_restrictPeers(configName: str) -> ResponseObject:
data = request.get_json()
peers = data['peers']
if configName in WireguardConfigurations.keys():
if len(peers) == 0:
return ResponseObject(False, "Please specify one or more peers")
configuration = WireguardConfigurations.get(configName)
return configuration.restrictPeers(peers)
return ResponseObject(False, "Configuration does not exist", status_code=404)
@app.post(f'{APP_PREFIX}/api/sharePeer/create')
def API_sharePeer_create():
data: dict[str, str] = request.get_json()
Configuration = data.get('Configuration')
Peer = data.get('Peer')
ExpireDate = data.get('ExpireDate')
if Configuration is None or Peer is None:
return ResponseObject(False, "Please specify configuration and peers")
activeLink = AllPeerShareLinks.getLink(Configuration, Peer)
if len(activeLink) > 0:
return ResponseObject(True,
"This peer is already sharing. Please view data for shared link.",
data=activeLink[0]
)
status, message = AllPeerShareLinks.addLink(Configuration, Peer, ExpireDate)
if not status:
return ResponseObject(status, message)
return ResponseObject(data=AllPeerShareLinks.getLinkByID(message))
@app.post(f'{APP_PREFIX}/api/sharePeer/update')
def API_sharePeer_update():
data: dict[str, str] = request.get_json()
ShareID: str = data.get("ShareID")
ExpireDate: str = data.get("ExpireDate")
if ShareID is None:
return ResponseObject(False, "Please specify ShareID")
if len(AllPeerShareLinks.getLinkByID(ShareID)) == 0:
return ResponseObject(False, "ShareID does not exist")
status, message = AllPeerShareLinks.updateLinkExpireDate(ShareID, ExpireDate)
if not status:
return ResponseObject(status, message)
return ResponseObject(data=AllPeerShareLinks.getLinkByID(ShareID))
@app.get(f'{APP_PREFIX}/api/sharePeer/get')
def API_sharePeer_get():
data = request.args
ShareID = data.get("ShareID")
if ShareID is None or len(ShareID) == 0:
return ResponseObject(False, "Please provide ShareID")
link = AllPeerShareLinks.getLinkByID(ShareID)
if len(link) == 0:
return ResponseObject(False, "This link is either expired to invalid")
l = link[0]
if l.Configuration not in WireguardConfigurations.keys():
return ResponseObject(False, "The peer you're looking for does not exist")
c = WireguardConfigurations.get(l.Configuration)
fp, p = c.searchPeer(l.Peer)
if not fp:
return ResponseObject(False, "The peer you're looking for does not exist")
return ResponseObject(data=p.downloadPeer())
@app.post(f'{APP_PREFIX}/api/allowAccessPeers/')
def API_allowAccessPeers(configName: str) -> ResponseObject:
data = request.get_json()
peers = data['peers']
if configName in WireguardConfigurations.keys():
if len(peers) == 0:
return ResponseObject(False, "Please specify one or more peers")
configuration = WireguardConfigurations.get(configName)
return configuration.allowAccessPeers(peers)
return ResponseObject(False, "Configuration does not exist")
@app.post(f'{APP_PREFIX}/api/addPeers/')
def API_addPeers(configName):
if configName in WireguardConfigurations.keys():
try:
data: dict = request.get_json()
bulkAdd: bool = data.get("bulkAdd", False)
bulkAddAmount: int = data.get('bulkAddAmount', 0)
preshared_key_bulkAdd: bool = data.get('preshared_key_bulkAdd', False)
public_key: str = data.get('public_key', "")
allowed_ips: list[str] = data.get('allowed_ips', [])
allowed_ips_validation: bool = data.get('allowed_ips_validation', True)
endpoint_allowed_ip: str = data.get('endpoint_allowed_ip', DashboardConfig.GetConfig("Peers", "peer_endpoint_allowed_ip")[1])
dns_addresses: str = data.get('DNS', DashboardConfig.GetConfig("Peers", "peer_global_DNS")[1])
mtu: int = data.get('mtu', int(DashboardConfig.GetConfig("Peers", "peer_MTU")[1]))
keep_alive: int = data.get('keepalive', int(DashboardConfig.GetConfig("Peers", "peer_keep_alive")[1]))
preshared_key: str = data.get('preshared_key', "")
if type(mtu) is not int or mtu < 0 or mtu > 1460:
mtu = int(DashboardConfig.GetConfig("Peers", "peer_MTU")[1])
if type(keep_alive) is not int or keep_alive < 0:
keep_alive = int(DashboardConfig.GetConfig("Peers", "peer_keep_alive")[1])
config = WireguardConfigurations.get(configName)
if not config.getStatus():
config.toggleConfiguration()
ipStatus, availableIps = config.getAvailableIP(-1)
ipCountStatus, numberOfAvailableIPs = config.getNumberOfAvailableIP()
defaultIPSubnet = list(availableIps.keys())[0]
if bulkAdd:
if type(preshared_key_bulkAdd) is not bool:
preshared_key_bulkAdd = False
if type(bulkAddAmount) is not int or bulkAddAmount < 1:
return ResponseObject(False, "Please specify amount of peers you want to add")
if not ipStatus:
return ResponseObject(False, "No more available IP can assign")
if len(availableIps.keys()) == 0:
return ResponseObject(False, "This configuration does not have any IP address available")
if bulkAddAmount > sum(list(numberOfAvailableIPs.values())):
return ResponseObject(False,
f"The maximum number of peers can add is {sum(list(numberOfAvailableIPs.values()))}")
keyPairs = []
addedCount = 0
for subnet in availableIps.keys():
for ip in availableIps[subnet]:
newPrivateKey = GenerateWireguardPrivateKey()[1]
addedCount += 1
keyPairs.append({
"private_key": newPrivateKey,
"id": GenerateWireguardPublicKey(newPrivateKey)[1],
"preshared_key": (GenerateWireguardPrivateKey()[1] if preshared_key_bulkAdd else ""),
"allowed_ip": ip,
"name": f"BulkPeer_{(addedCount + 1)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"DNS": dns_addresses,
"endpoint_allowed_ip": endpoint_allowed_ip,
"mtu": mtu,
"keepalive": keep_alive,
"advanced_security": data.get("advanced_security", "off")
})
if addedCount == bulkAddAmount:
break
if addedCount == bulkAddAmount:
break
if len(keyPairs) == 0 or (bulkAdd and len(keyPairs) != bulkAddAmount):
return ResponseObject(False, "Generating key pairs by bulk failed")
status, result = config.addPeers(keyPairs)
return ResponseObject(status=status, message=result['message'], data=result['peers'])
else:
if config.searchPeer(public_key)[0] is True:
return ResponseObject(False, f"This peer already exist")
name = data.get("name", "")
private_key = data.get("private_key", "")
if len(public_key) == 0:
if len(private_key) == 0:
private_key = GenerateWireguardPrivateKey()[1]
public_key = GenerateWireguardPublicKey(private_key)[1]
else:
public_key = GenerateWireguardPublicKey(private_key)[1]
else:
if len(private_key) > 0:
genPub = GenerateWireguardPublicKey(private_key)[1]
# Check if provided pubkey match provided private key
if public_key != genPub:
return ResponseObject(False, "Provided Public Key does not match provided Private Key")
# if len(public_key) == 0 and len(private_key) == 0:
# private_key = GenerateWireguardPrivateKey()[1]
# public_key = GenerateWireguardPublicKey(private_key)[1]
# elif len(public_key) == 0 and len(private_key) > 0:
# public_key = GenerateWireguardPublicKey(private_key)[1]
if len(allowed_ips) == 0:
if ipStatus:
for subnet in availableIps.keys():
for ip in availableIps[subnet]:
allowed_ips = [ip]
break
break
else:
return ResponseObject(False, "No more available IP can assign")
if allowed_ips_validation:
for i in allowed_ips:
found = False
for subnet in availableIps.keys():
network = ipaddress.ip_network(subnet, False)
ap = ipaddress.ip_network(i)
if network.version == ap.version and ap.subnet_of(network):
found = True
if not found:
return ResponseObject(False, f"This IP is not available: {i}")
status, result = config.addPeers([
{
"name": name,
"id": public_key,
"private_key": private_key,
"allowed_ip": ','.join(allowed_ips),
"preshared_key": preshared_key,
"endpoint_allowed_ip": endpoint_allowed_ip,
"DNS": dns_addresses,
"mtu": mtu,
"keepalive": keep_alive,
"advanced_security": data.get("advanced_security", "off")
}]
)
return ResponseObject(status=status, message=result['message'], data=result['peers'])
except Exception as e:
print(e, str(e.__traceback__))
return ResponseObject(False, "Add peers failed. Please see data for specific issue")
return ResponseObject(False, "Configuration does not exist")
@app.get(f"{APP_PREFIX}/api/downloadPeer/")
def API_downloadPeer(configName):
data = request.args
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
configuration = WireguardConfigurations[configName]
peerFound, peer = configuration.searchPeer(data['id'])
if len(data['id']) == 0 or not peerFound:
return ResponseObject(False, "Peer does not exist")
return ResponseObject(data=peer.downloadPeer())
@app.get(f"{APP_PREFIX}/api/downloadAllPeers/")
def API_downloadAllPeers(configName):
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
configuration = WireguardConfigurations[configName]
peerData = []
untitledPeer = 0
for i in configuration.Peers:
file = i.downloadPeer()
if file["fileName"] == "UntitledPeer":
file["fileName"] = str(untitledPeer) + "_" + file["fileName"]
untitledPeer += 1
peerData.append(file)
return ResponseObject(data=peerData)
@app.get(f"{APP_PREFIX}/api/getAvailableIPs/")
def API_getAvailableIPs(configName):
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
status, ips = WireguardConfigurations.get(configName).getAvailableIP()
return ResponseObject(status=status, data=ips)
@app.get(f"{APP_PREFIX}/api/getNumberOfAvailableIPs/")
def API_getNumberOfAvailableIPs(configName):
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
status, ips = WireguardConfigurations.get(configName).getNumberOfAvailableIP()
return ResponseObject(status=status, data=ips)
@app.get(f'{APP_PREFIX}/api/getWireguardConfigurationInfo')
def API_getConfigurationInfo():
configurationName = request.args.get("configurationName")
if not configurationName or configurationName not in WireguardConfigurations.keys():
return ResponseObject(False, "Please provide configuration name")
return ResponseObject(data={
"configurationInfo": WireguardConfigurations[configurationName],
"configurationPeers": WireguardConfigurations[configurationName].getPeersList(),
"configurationRestrictedPeers": WireguardConfigurations[configurationName].getRestrictedPeersList()
})
@app.get(f'{APP_PREFIX}/api/getDashboardTheme')
def API_getDashboardTheme():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "dashboard_theme")[1])
@app.get(f'{APP_PREFIX}/api/getDashboardVersion')
def API_getDashboardVersion():
return ResponseObject(data=DashboardConfig.GetConfig("Server", "version")[1])
@app.post(f'{APP_PREFIX}/api/savePeerScheduleJob')
def API_savePeerScheduleJob():
data = request.json
if "Job" not in data.keys():
return ResponseObject(False, "Please specify job")
job: dict = data['Job']
if "Peer" not in job.keys() or "Configuration" not in job.keys():
return ResponseObject(False, "Please specify peer and configuration")
configuration = WireguardConfigurations.get(job['Configuration'])
if configuration is None:
return ResponseObject(False, "Configuration does not exist")
f, fp = configuration.searchPeer(job['Peer'])
if not f:
return ResponseObject(False, "Peer does not exist")
s, p = AllPeerJobs.saveJob(PeerJob(
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
job['CreationDate'], job['ExpireDate'], job['Action']))
if s:
return ResponseObject(s, data=p)
return ResponseObject(s, message=p)
@app.post(f'{APP_PREFIX}/api/deletePeerScheduleJob')
def API_deletePeerScheduleJob():
data = request.json
if "Job" not in data.keys():
return ResponseObject(False, "Please specify job")
job: dict = data['Job']
if "Peer" not in job.keys() or "Configuration" not in job.keys():
return ResponseObject(False, "Please specify peer and configuration")
configuration = WireguardConfigurations.get(job['Configuration'])
if configuration is None:
return ResponseObject(False, "Configuration does not exist")
f, fp = configuration.searchPeer(job['Peer'])
if not f:
return ResponseObject(False, "Peer does not exist")
s, p = AllPeerJobs.deleteJob(PeerJob(
job['JobID'], job['Configuration'], job['Peer'], job['Field'], job['Operator'], job['Value'],
job['CreationDate'], job['ExpireDate'], job['Action']))
if s:
return ResponseObject(s, data=p)
return ResponseObject(s, message=p)
@app.get(f'{APP_PREFIX}/api/getPeerScheduleJobLogs/')
def API_getPeerScheduleJobLogs(configName):
if configName not in WireguardConfigurations.keys():
return ResponseObject(False, "Configuration does not exist")
data = request.args.get("requestAll")
requestAll = False
if data is not None and data == "true":
requestAll = True
return ResponseObject(data=JobLogger.getLogs(requestAll, configName))
'''
File Download
'''
@app.get(f'{APP_PREFIX}/fileDownload')
def API_download():
file = request.args.get('file')
if file is None or len(file) == 0:
return ResponseObject(False, "Please specify a file")
if os.path.exists(os.path.join('download', file)):
return send_file(os.path.join('download', file), as_attachment=True)
else:
return ResponseObject(False, "File does not exist")
'''
Tools
'''
@app.get(f'{APP_PREFIX}/api/ping/getAllPeersIpAddress')
def API_ping_getAllPeersIpAddress():
ips = {}
for c in WireguardConfigurations.values():
cips = {}
for p in c.Peers:
allowed_ip = p.allowed_ip.replace(" ", "").split(",")
parsed = []
for x in allowed_ip:
try:
ip = ipaddress.ip_network(x, strict=False)
except ValueError as e:
print(f"{p.id} - {c.Name}")
if len(list(ip.hosts())) == 1:
parsed.append(str(ip.hosts()[0]))
endpoint = p.endpoint.replace(" ", "").replace("(none)", "")
if len(p.name) > 0:
cips[f"{p.name} - {p.id}"] = {
"allowed_ips": parsed,
"endpoint": endpoint
}
else:
cips[f"{p.id}"] = {
"allowed_ips": parsed,
"endpoint": endpoint
}
ips[c.Name] = cips
return ResponseObject(data=ips)
import requests
@app.get(f'{APP_PREFIX}/api/ping/execute')
def API_ping_execute():
if "ipAddress" in request.args.keys() and "count" in request.args.keys():
ip = request.args['ipAddress']
count = request.args['count']
try:
if ip is not None and len(ip) > 0 and count is not None and count.isnumeric():
result = ping(ip, count=int(count), source=None)
data = {
"address": result.address,
"is_alive": result.is_alive,
"min_rtt": result.min_rtt,
"avg_rtt": result.avg_rtt,
"max_rtt": result.max_rtt,
"package_sent": result.packets_sent,
"package_received": result.packets_received,
"package_loss": result.packet_loss,
"geo": None
}
try:
r = requests.get(f"http://ip-api.com/json/{result.address}?field=city")
data['geo'] = r.json()
except Exception as e:
pass
return ResponseObject(data=data)
return ResponseObject(False, "Please specify an IP Address (v4/v6)")
except Exception as exp:
return ResponseObject(False, exp)
return ResponseObject(False, "Please provide ipAddress and count")
@app.get(f'{APP_PREFIX}/api/traceroute/execute')
def API_traceroute_execute():
if "ipAddress" in request.args.keys() and len(request.args.get("ipAddress")) > 0:
ipAddress = request.args.get('ipAddress')
try:
tracerouteResult = traceroute(ipAddress, timeout=1, max_hops=64)
result = []
for hop in tracerouteResult:
if len(result) > 1:
skipped = False
for i in range(result[-1]["hop"] + 1, hop.distance):
result.append(
{
"hop": i,
"ip": "*",
"avg_rtt": "*",
"min_rtt": "*",
"max_rtt": "*"
}
)
skip = True
if skipped: continue
result.append(
{
"hop": hop.distance,
"ip": hop.address,
"avg_rtt": hop.avg_rtt,
"min_rtt": hop.min_rtt,
"max_rtt": hop.max_rtt
})
try:
r = requests.post(f"http://ip-api.com/batch?fields=city,country,lat,lon,query",
data=json.dumps([x['ip'] for x in result]))
d = r.json()
for i in range(len(result)):
result[i]['geo'] = d[i]
except Exception as e:
return ResponseObject(data=result, message="Failed to request IP address geolocation")
return ResponseObject(data=result)
except Exception as exp:
return ResponseObject(False, exp)
else:
return ResponseObject(False, "Please provide ipAddress")
@app.get(f'{APP_PREFIX}/api/getDashboardUpdate')
def API_getDashboardUpdate():
import urllib.request as req
try:
r = req.urlopen("https://api.github.com/repos/donaldzou/WGDashboard/releases/latest", timeout=5).read()
data = dict(json.loads(r))
tagName = data.get('tag_name')
htmlUrl = data.get('html_url')
if tagName is not None and htmlUrl is not None:
if version.parse(tagName) > version.parse(DASHBOARD_VERSION):
return ResponseObject(message=f"{tagName} is now available for update!", data=htmlUrl)
else:
return ResponseObject(message="You're on the latest version")
return ResponseObject(False)
except Exception as e:
return ResponseObject(False, f"Request to GitHub API failed.")
'''
Sign Up
'''
@app.get(f'{APP_PREFIX}/api/isTotpEnabled')
def API_isTotpEnabled():
return (
ResponseObject(data=DashboardConfig.GetConfig("Account", "enable_totp")[1] and DashboardConfig.GetConfig("Account", "totp_verified")[1]))
@app.get(f'{APP_PREFIX}/api/Welcome_GetTotpLink')
def API_Welcome_GetTotpLink():
if not DashboardConfig.GetConfig("Account", "totp_verified")[1]:
DashboardConfig.SetConfig("Account", "totp_key", pyotp.random_base32())
return ResponseObject(
data=pyotp.totp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).provisioning_uri(
issuer_name="WGDashboard"))
return ResponseObject(False)
@app.post(f'{APP_PREFIX}/api/Welcome_VerifyTotpLink')
def API_Welcome_VerifyTotpLink():
data = request.get_json()
totp = pyotp.TOTP(DashboardConfig.GetConfig("Account", "totp_key")[1]).now()
if totp == data['totp']:
DashboardConfig.SetConfig("Account", "totp_verified", "true")
DashboardConfig.SetConfig("Account", "enable_totp", "true")
return ResponseObject(totp == data['totp'])
@app.post(f'{APP_PREFIX}/api/Welcome_Finish')
def API_Welcome_Finish():
data = request.get_json()
if DashboardConfig.GetConfig("Other", "welcome_session")[1]:
if data["username"] == "":
return ResponseObject(False, "Username cannot be blank.")
if data["newPassword"] == "" or len(data["newPassword"]) < 8:
return ResponseObject(False, "Password must be at least 8 characters")
updateUsername, updateUsernameErr = DashboardConfig.SetConfig("Account", "username", data["username"])
updatePassword, updatePasswordErr = DashboardConfig.SetConfig("Account", "password",
{
"newPassword": data["newPassword"],
"repeatNewPassword": data["repeatNewPassword"],
"currentPassword": "admin"
})
if not updateUsername or not updatePassword:
return ResponseObject(False, f"{updateUsernameErr},{updatePasswordErr}".strip(","))
DashboardConfig.SetConfig("Other", "welcome_session", False)
return ResponseObject()
class Locale:
def __init__(self):
self.localePath = './static/locale/'
self.activeLanguages = {}
with open(os.path.join(f"{self.localePath}active_languages.json"), "r") as f:
self.activeLanguages = sorted(json.loads(''.join(f.readlines())), key=lambda x : x['lang_name'])
def getLanguage(self) -> dict | None:
currentLanguage = DashboardConfig.GetConfig("Server", "dashboard_language")[1]
if currentLanguage == "en":
return None
if os.path.exists(os.path.join(f"{self.localePath}{currentLanguage}.json")):
with open(os.path.join(f"{self.localePath}{currentLanguage}.json"), "r") as f:
return dict(json.loads(''.join(f.readlines())))
else:
return None
def updateLanguage(self, lang_id):
if not os.path.exists(os.path.join(f"{self.localePath}{lang_id}.json")):
DashboardConfig.SetConfig("Server", "dashboard_language", "en")
else:
DashboardConfig.SetConfig("Server", "dashboard_language", lang_id)
Locale = Locale()
@app.get(f'{APP_PREFIX}/api/locale')
def API_Locale_CurrentLang():
return ResponseObject(data=Locale.getLanguage())
@app.get(f'{APP_PREFIX}/api/locale/available')
def API_Locale_Available():
return ResponseObject(data=Locale.activeLanguages)
@app.post(f'{APP_PREFIX}/api/locale/update')
def API_Locale_Update():
data = request.get_json()
if 'lang_id' not in data.keys():
return ResponseObject(False, "Please specify a lang_id")
Locale.updateLanguage(data['lang_id'])
return ResponseObject(data=Locale.getLanguage())
@app.get(f'{APP_PREFIX}/api/email/ready')
def API_Email_Ready():
return ResponseObject(EmailSender.ready())
@app.post(f'{APP_PREFIX}/api/email/send')
def API_Email_Send():
data = request.get_json()
if "Receiver" not in data.keys():
return ResponseObject(False, "Please at least specify receiver")
body = data.get('Body', '')
download = None
if ("ConfigurationName" in data.keys()
and "Peer" in data.keys()):
if data.get('ConfigurationName') in WireguardConfigurations.keys():
configuration = WireguardConfigurations.get(data.get('ConfigurationName'))
attachmentName = ""
if configuration is not None:
fp, p = configuration.searchPeer(data.get('Peer'))
if fp:
template = Template(body)
download = p.downloadPeer()
body = template.render(peer=p.toJson(), configurationFile=download)
if data.get('IncludeAttachment', False):
u = str(uuid4())
attachmentName = f'{u}.conf'
with open(os.path.join('./attachments', attachmentName,), 'w+') as f:
f.write(download['file'])
s, m = EmailSender.send(data.get('Receiver'), data.get('Subject', ''), body,
data.get('IncludeAttachment', False), (attachmentName if download else ''))
return ResponseObject(s, m)
@app.post(f'{APP_PREFIX}/api/email/previewBody')
def API_Email_PreviewBody():
data = request.get_json()
body = data.get('Body', '')
if len(body) == 0:
return ResponseObject(False, "Nothing to preview")
if ("ConfigurationName" not in data.keys()
or "Peer" not in data.keys() or data.get('ConfigurationName') not in WireguardConfigurations.keys()):
return ResponseObject(False, "Please specify configuration and peer")
configuration = WireguardConfigurations.get(data.get('ConfigurationName'))
fp, p = configuration.searchPeer(data.get('Peer'))
if not fp:
return ResponseObject(False, "Peer does not exist")
try:
template = Template(body)
download = p.downloadPeer()
body = template.render(peer=p.toJson(), configurationFile=download)
return ResponseObject(data=body)
except Exception as e:
return ResponseObject(False, message=str(e))
@app.get(f'{APP_PREFIX}/api/systemStatus')
def API_SystemStatus():
return ResponseObject(data=SystemStatus)
@app.get(f'{APP_PREFIX}/api/protocolsEnabled')
def API_ProtocolsEnabled():
return ResponseObject(data=ProtocolsEnabled())
@app.get(f'{APP_PREFIX}/')
def index():
return render_template('index.html')
def peerInformationBackgroundThread():
global WireguardConfigurations
print(f"[WGDashboard] Background Thread #1 Started", flush=True)
time.sleep(10)
while True:
with app.app_context():
for c in WireguardConfigurations.values():
if c.getStatus():
try:
c.getPeersTransfer()
c.getPeersLatestHandshake()
c.getPeersEndpoint()
c.getPeersList()
c.getRestrictedPeersList()
except Exception as e:
print(f"[WGDashboard] Background Thread #1 Error: {str(e)}", flush=True)
time.sleep(10)
def peerJobScheduleBackgroundThread():
with app.app_context():
print(f"[WGDashboard] Background Thread #2 Started", flush=True)
time.sleep(10)
while True:
AllPeerJobs.runJob()
time.sleep(180)
def gunicornConfig():
_, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
return app_ip, app_port
def ProtocolsEnabled() -> list[str]:
from shutil import which
protocols = []
if which('awg') is not None and which('awg-quick') is not None:
protocols.append("awg")
if which('wg') is not None and which('wg-quick') is not None:
protocols.append("wg")
return protocols
def InitWireguardConfigurationsList(startup: bool = False):
if os.path.exists(DashboardConfig.GetConfig("Server", "wg_conf_path")[1]):
confs = os.listdir(DashboardConfig.GetConfig("Server", "wg_conf_path")[1])
confs.sort()
for i in confs:
if RegexMatch("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
try:
if i in WireguardConfigurations.keys():
if WireguardConfigurations[i].configurationFileChanged():
WireguardConfigurations[i] = WireguardConfiguration(i)
else:
WireguardConfigurations[i] = WireguardConfiguration(i, startup=startup)
except WireguardConfiguration.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.")
if "awg" in ProtocolsEnabled():
confs = os.listdir(DashboardConfig.GetConfig("Server", "awg_conf_path")[1])
confs.sort()
for i in confs:
if RegexMatch("^(.{1,}).(conf)$", i):
i = i.replace('.conf', '')
try:
if i in WireguardConfigurations.keys():
if WireguardConfigurations[i].configurationFileChanged():
WireguardConfigurations[i] = AmneziaWireguardConfiguration(i)
else:
WireguardConfigurations[i] = AmneziaWireguardConfiguration(i, startup=startup)
except WireguardConfigurations.InvalidConfigurationFileException as e:
print(f"{i} have an invalid configuration file.")
AllPeerShareLinks: PeerShareLinks = PeerShareLinks()
AllPeerJobs: PeerJobs = PeerJobs()
JobLogger: PeerJobLogger = PeerJobLogger(CONFIGURATION_PATH, AllPeerJobs)
DashboardLogger: DashboardLogger = DashboardLogger(CONFIGURATION_PATH)
_, app_ip = DashboardConfig.GetConfig("Server", "app_ip")
_, app_port = DashboardConfig.GetConfig("Server", "app_port")
_, WG_CONF_PATH = DashboardConfig.GetConfig("Server", "wg_conf_path")
WireguardConfigurations: dict[str, WireguardConfiguration] = {}
AmneziaWireguardConfigurations: dict[str, AmneziaWireguardConfiguration] = {}
InitWireguardConfigurationsList(startup=True)
def startThreads():
bgThread = threading.Thread(target=peerInformationBackgroundThread, daemon=True)
bgThread.start()
scheduleJobThread = threading.Thread(target=peerJobScheduleBackgroundThread, daemon=True)
scheduleJobThread.start()
if __name__ == "__main__":
startThreads()
app.run(host=app_ip, debug=False, port=app_port)
```
## /src/gunicorn.conf.py
```py path="/src/gunicorn.conf.py"
import os.path
import dashboard, configparser
from datetime import datetime
global sqldb, cursor, DashboardConfig, WireguardConfigurations, AllPeerJobs, JobLogger
app_host, app_port = dashboard.gunicornConfig()
date = datetime.today().strftime('%Y_%m_%d_%H_%M_%S')
def post_worker_init(worker):
dashboard.startThreads()
worker_class = 'gthread'
workers = 1
threads = 1
bind = f"{app_host}:{app_port}"
daemon = True
pidfile = './gunicorn.pid'
wsgi_app = "dashboard:app"
accesslog = f"./log/access_{date}.log"
log_level = "debug"
capture_output = True
errorlog = f"./log/error_{date}.log"
pythonpath = "., ./modules"
print(f"[Gunicorn] WGDashboard w/ Gunicorn will be running on {bind}", flush=True)
print(f"[Gunicorn] Access log file is at {accesslog}", flush=True)
print(f"[Gunicorn] Error log file is at {errorlog}", flush=True)
```
## /src/modules/DashboardLogger.py
```py path="/src/modules/DashboardLogger.py"
"""
Dashboard Logger Class
"""
import sqlite3, os, uuid
class DashboardLogger:
def __init__(self, CONFIGURATION_PATH):
self.loggerdb = sqlite3.connect(os.path.join(CONFIGURATION_PATH, 'db', 'wgdashboard_log.db'),
isolation_level=None,
check_same_thread=False)
self.loggerdb.row_factory = sqlite3.Row
self.__createLogDatabase()
self.log(Message="WGDashboard started")
def __createLogDatabase(self):
with self.loggerdb:
loggerdbCursor = self.loggerdb.cursor()
existingTable = loggerdbCursor.execute("SELECT name from sqlite_master where type='table'").fetchall()
existingTable = [t['name'] for t in existingTable]
if "DashboardLog" not in existingTable:
loggerdbCursor.execute(
"CREATE TABLE DashboardLog (LogID VARCHAR NOT NULL, LogDate DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S','now', 'localtime')), URL VARCHAR, IP VARCHAR, Status VARCHAR, Message VARCHAR, PRIMARY KEY (LogID))")
if self.loggerdb.in_transaction:
self.loggerdb.commit()
def log(self, URL: str = "", IP: str = "", Status: str = "true", Message: str = "") -> bool:
try:
loggerdbCursor = self.loggerdb.cursor()
loggerdbCursor.execute(
"INSERT INTO DashboardLog (LogID, URL, IP, Status, Message) VALUES (?, ?, ?, ?, ?);", (str(uuid.uuid4()), URL, IP, Status, Message,))
loggerdbCursor.close()
self.loggerdb.commit()
return True
except Exception as e:
print(f"[WGDashboard] Access Log Error: {str(e)}")
return False
```
## /src/modules/Email.py
```py path="/src/modules/Email.py"
import os.path
import smtplib
from email import encoders
from email.header import Header
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
class EmailSender:
def __init__(self, DashboardConfig):
self.smtp = None
self.DashboardConfig = DashboardConfig
if not os.path.exists('./attachments'):
os.mkdir('./attachments')
def Server(self):
return self.DashboardConfig.GetConfig("Email", "server")[1]
def Port(self):
return self.DashboardConfig.GetConfig("Email", "port")[1]
def Encryption(self):
return self.DashboardConfig.GetConfig("Email", "encryption")[1]
def Username(self):
return self.DashboardConfig.GetConfig("Email", "username")[1]
def Password(self):
return self.DashboardConfig.GetConfig("Email", "email_password")[1]
def SendFrom(self):
return self.DashboardConfig.GetConfig("Email", "send_from")[1]
def ready(self):
return len(self.Server()) > 0 and len(self.Port()) > 0 and len(self.Encryption()) > 0 and len(self.Username()) > 0 and len(self.Password()) > 0 and len(self.SendFrom())
def send(self, receiver, subject, body, includeAttachment = False, attachmentName = ""):
if self.ready():
try:
self.smtp = smtplib.SMTP(self.Server(), port=int(self.Port()))
self.smtp.ehlo()
if self.Encryption() == "STARTTLS":
self.smtp.starttls()
self.smtp.login(self.Username(), self.Password())
message = MIMEMultipart()
message['Subject'] = subject
message['From'] = self.SendFrom()
message["To"] = receiver
message.attach(MIMEText(body, "plain"))
if includeAttachment and len(attachmentName) > 0:
attachmentPath = os.path.join('./attachments', attachmentName)
if os.path.exists(attachmentPath):
attachment = MIMEBase("application", "octet-stream")
with open(os.path.join('./attachments', attachmentName), 'rb') as f:
attachment.set_payload(f.read())
encoders.encode_base64(attachment)
attachment.add_header("Content-Disposition", f"attachment; filename= {attachmentName}",)
message.attach(attachment)
else:
self.smtp.close()
return False, "Attachment does not exist"
self.smtp.sendmail(self.SendFrom(), receiver, message.as_string())
self.smtp.close()
return True, None
except Exception as e:
return False, f"Send failed | Reason: {e}"
return False, "SMTP not configured"
```
## /src/modules/Log.py
```py path="/src/modules/Log.py"
"""
Log Class
"""
class Log:
def __init__(self, LogID: str, JobID: str, LogDate: str, Status: str, Message: str):
self.LogID = LogID
self.JobID = JobID
self.LogDate = LogDate
self.Status = Status
self.Message = Message
def toJson(self):
return {
"LogID": self.LogID,
"JobID": self.JobID,
"LogDate": self.LogDate,
"Status": self.Status,
"Message": self.Message
}
def __dict__(self):
return self.toJson()
```
## /src/modules/PeerJob.py
```py path="/src/modules/PeerJob.py"
"""
Peer Job
"""
from datetime import datetime
class PeerJob:
def __init__(self, JobID: str, Configuration: str, Peer: str,
Field: str, Operator: str, Value: str, CreationDate: datetime, ExpireDate: datetime, Action: str):
self.Action = Action
self.ExpireDate = ExpireDate
self.CreationDate = CreationDate
self.Value = Value
self.Operator = Operator
self.Field = Field
self.Configuration = Configuration
self.Peer = Peer
self.JobID = JobID
def toJson(self):
return {
"JobID": self.JobID,
"Configuration": self.Configuration,
"Peer": self.Peer,
"Field": self.Field,
"Operator": self.Operator,
"Value": self.Value,
"CreationDate": self.CreationDate,
"ExpireDate": self.ExpireDate,
"Action": self.Action
}
def __dict__(self):
return self.toJson()
```
The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.