Port over local repo
This commit is contained in:
commit
676b6aa8ea
21 changed files with 1121 additions and 0 deletions
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
__pycache__
|
||||||
|
__init__.py
|
||||||
|
.bot-env
|
75
README.md
Normal file
75
README.md
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
I went a little overboard with this bot.
|
||||||
|
|
||||||
|
This started off as just a NLP test in a Discord bot however I ended up liking
|
||||||
|
it so I'm porting some additional ideas into it as well, and might just use this
|
||||||
|
for my guilds.
|
||||||
|
|
||||||
|
## Natural Language Processing
|
||||||
|
|
||||||
|
Back before the advent of interactions, I never understood why we were trying to
|
||||||
|
train users on command prefixes. It made sense to have "!do_a_thing" but it was
|
||||||
|
also unnatural.
|
||||||
|
|
||||||
|
We've long had bots that grep out phrases and act when they're made. There was
|
||||||
|
nothing stopping us from analyzing every on_message for trigger phrases and
|
||||||
|
performing actions when conditions are met.
|
||||||
|
|
||||||
|
We've already trained users on "Hey Google" and Apple has gone as far as "Siri,
|
||||||
|
".
|
||||||
|
|
||||||
|
This bot does away with commands - prefixes or UI commands.
|
||||||
|
|
||||||
|
This bot listens out for being mentioned in a Discord or being DM'd directly.
|
||||||
|
It leverages spaCy to tokenize and lemmatize the message and attempts to detect
|
||||||
|
intention from a relatively free form message.
|
||||||
|
|
||||||
|
## Cog/Action Separation
|
||||||
|
|
||||||
|
I have this issue where I'll write a Discord bot and lose a lot of time porting
|
||||||
|
over logic and commands, or just losing progress all together.
|
||||||
|
|
||||||
|
This bot separates a lot of the logic between performing actions and determining
|
||||||
|
when to perform actions.
|
||||||
|
|
||||||
|
- Cogs are used to detect intention and process messages
|
||||||
|
- Cogs just run actions
|
||||||
|
- Actions contain automation logic
|
||||||
|
|
||||||
|
This is a leadup into creating an API for managing a Discord server, and might
|
||||||
|
allow me to convert the bot into supporting OpenTofu if I'm motivated.
|
||||||
|
|
||||||
|
## REDIS
|
||||||
|
|
||||||
|
I had some issues with the MySQL helper. Latency per request was a little too
|
||||||
|
high. Those database calls added *just* enough latency to be noticable in the
|
||||||
|
Discord client, and in a few larger requests it even timed out on the
|
||||||
|
interaction reply.
|
||||||
|
|
||||||
|
REDIS is super quick and way easier to configure.
|
||||||
|
|
||||||
|
Long story but I much prefer MariaDB to REDIS for storing application data. In
|
||||||
|
my K3S clusters I can easily just delegate out table/credentials using the
|
||||||
|
MariaDB provider, and allow the provider to handle scaling for me.
|
||||||
|
|
||||||
|
Unfortunately doing this with REDIS isn't as easy. REDIS isn't designed for this
|
||||||
|
kind of thing and because of that, you'll probably need to run a REDIS pod
|
||||||
|
alongside your Bot.
|
||||||
|
|
||||||
|
But most bots are likely just docker-compose for fun or whatever. I don't see
|
||||||
|
this as massively scalable code. And REDIS simplifies the ever loving shit out
|
||||||
|
of the codes logic.
|
||||||
|
|
||||||
|
## Notice on AI
|
||||||
|
|
||||||
|
People love to jump up and say "OH BUT THIS IS AI".
|
||||||
|
|
||||||
|
Yeah no shit mate.
|
||||||
|
|
||||||
|
I use AI to skip searching for things. I get it to help along the way then I
|
||||||
|
ask it to cleanup code afterwards so that I don't have to waste time coding out
|
||||||
|
try blocks and shit.
|
||||||
|
|
||||||
|
Most of this code was just written out and then processed by AI. So fuck off
|
||||||
|
about it.
|
||||||
|
|
||||||
|
If you're curious; ChatGPT and Gemini.
|
22
docker-compose.yaml
Normal file
22
docker-compose.yaml
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
services:
|
||||||
|
|
||||||
|
app:
|
||||||
|
build: ./docker
|
||||||
|
environment:
|
||||||
|
- PYTHONUNBUFFERED=1
|
||||||
|
env_file:
|
||||||
|
- ./.bot-env
|
||||||
|
volumes:
|
||||||
|
- ./src:/app
|
||||||
|
# For testing
|
||||||
|
# Use docker compose run app
|
||||||
|
#entrypoint: /bin/bash
|
||||||
|
#stdin_open: true
|
||||||
|
#tty: true
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis:7
|
||||||
|
volumes:
|
||||||
|
- ./redis-data:/data
|
||||||
|
command: ["redis-server", "--appendonly", "yes"]
|
||||||
|
restart: unless-stopped
|
38
docker/Dockerfile
Normal file
38
docker/Dockerfile
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
FROM ubuntu:25.10
|
||||||
|
|
||||||
|
|
||||||
|
# Pip is really freaking annoying now.
|
||||||
|
# We'll install venv. Install all modules into the venv using the entrypoint.
|
||||||
|
|
||||||
|
|
||||||
|
# Basics to run
|
||||||
|
RUN apt update && apt -y install python3 python3-pip
|
||||||
|
|
||||||
|
# Required for voice support
|
||||||
|
RUN apt update && apt -y install libffi-dev libnacl-dev python3-dev python3.13-venv
|
||||||
|
|
||||||
|
# Create workdir
|
||||||
|
RUN mkdir -p /app
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Create venv
|
||||||
|
RUN python3 -m venv /venv
|
||||||
|
RUN chown -R ubuntu:ubuntu /venv
|
||||||
|
|
||||||
|
# Upload entrypoint
|
||||||
|
COPY entrypoint.sh /entrypoint.sh
|
||||||
|
RUN chmod +x /entrypoint.sh
|
||||||
|
RUN chown ubuntu:ubuntu /entrypoint.sh
|
||||||
|
|
||||||
|
# Pre-install requirements
|
||||||
|
COPY requirements.txt /requirements.txt
|
||||||
|
RUN chown ubuntu:ubuntu /requirements.txt
|
||||||
|
|
||||||
|
# Ubuntu comes loaded with the ubuntu user (id 1000) so just use that.
|
||||||
|
USER ubuntu
|
||||||
|
|
||||||
|
# Install requirements and spacy core
|
||||||
|
RUN /venv/bin/pip install -r /requirements.txt
|
||||||
|
RUN /venv/bin/python -m spacy download en_core_web_sm
|
||||||
|
|
||||||
|
ENTRYPOINT ["/entrypoint.sh"]
|
16
docker/entrypoint.sh
Normal file
16
docker/entrypoint.sh
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
#source /venv/bin/activate
|
||||||
|
|
||||||
|
# Install Python deps
|
||||||
|
#if [ -f /app/requirements.txt ]
|
||||||
|
#then
|
||||||
|
# python3 -m pip install --break-system-packages -r /app/requirements.txt
|
||||||
|
#fi
|
||||||
|
#
|
||||||
|
#python3 -m spacy download en_core_web_sm
|
||||||
|
|
||||||
|
#exec python3 /app/bot.py "$@"
|
||||||
|
exec /venv/bin/python /app/bot.py "$@"
|
3
docker/requirements.txt
Normal file
3
docker/requirements.txt
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
spacy
|
||||||
|
redis
|
||||||
|
discord.py[voice]
|
130
src/actions/avc.py
Normal file
130
src/actions/avc.py
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
import discord
|
||||||
|
import re
|
||||||
|
from actions.core import ActionCore
|
||||||
|
|
||||||
|
class ActionAVC(ActionCore):
|
||||||
|
|
||||||
|
def avc_parent_hkey(self, txt):
|
||||||
|
return f"avc:m:{txt}"
|
||||||
|
|
||||||
|
def avc_channel_hkey(self, txt):
|
||||||
|
return f"avc:c:{txt}"
|
||||||
|
|
||||||
|
async def avc_ismon(self, vc: discord.VoiceChannel):
|
||||||
|
"""
|
||||||
|
If we have an entry for monitoring this VC, returns True
|
||||||
|
False otherwise
|
||||||
|
"""
|
||||||
|
phkey = self.avc_parent_hkey(vc.id)
|
||||||
|
if await redis.hexists(phkey, category_key):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def avc_isman(self, vc: discord.VoiceChannel):
|
||||||
|
"""
|
||||||
|
If we have an entry for monitoring this VC, returns True
|
||||||
|
False otherwise
|
||||||
|
"""
|
||||||
|
chkey = self.avc_channel_hkey(vc.id)
|
||||||
|
if await redis.hexists(chkey, category_key):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def avc_return_parent_category(self, vc: discord.VoiceChannel):
|
||||||
|
"""
|
||||||
|
Accepts a discord.VoiceChannel as an input
|
||||||
|
Returns either a custom set category or the VoiceChannels category
|
||||||
|
|
||||||
|
Simplifies logic later when we try to determine "has the user configured a category for
|
||||||
|
newly created voice channels"
|
||||||
|
"""
|
||||||
|
# Set vars
|
||||||
|
phkey = self.avc_parent_hkey(vc.id)
|
||||||
|
category_key = "category"
|
||||||
|
|
||||||
|
if await self.redis.hexists(phkey, category_key):
|
||||||
|
category_id = await self.redis.hget(phkey, category_key)
|
||||||
|
category = await self.bot.get_channel(category_id)
|
||||||
|
else:
|
||||||
|
category = vc.category
|
||||||
|
|
||||||
|
return category
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def avc_create(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
||||||
|
"""
|
||||||
|
Checks to see if we're monitoring this channel.
|
||||||
|
|
||||||
|
If we are, it'll check to see where it should create a new channel.
|
||||||
|
Create the new channel. Move the user to it. Internally ID them as psuedo-owner.
|
||||||
|
|
||||||
|
If it succeeds then it returns the new VC else it will return None
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Make sure we're monitoring
|
||||||
|
if await self.avc_ismon(after):
|
||||||
|
# If we are, grab it's category
|
||||||
|
category = await self.avc_return_parent_category(after)
|
||||||
|
|
||||||
|
# Configure the channel name for the vc
|
||||||
|
channel_name = re.sub(r'[^a-z0-9]', '', member.nick.lower())
|
||||||
|
channel_name = channel_name if channel_name else "someone"
|
||||||
|
channel_name = channel_name.capitalize()
|
||||||
|
channel_name = f"{channel_name}'s VC"
|
||||||
|
|
||||||
|
# Create the voice channel
|
||||||
|
new_vc = await category.create_voice_channel(channel_name)
|
||||||
|
|
||||||
|
# Move the user to the voice channel
|
||||||
|
await member.move_to(new_vc)
|
||||||
|
|
||||||
|
# Register this new channel in REDIS
|
||||||
|
chkey = self.avc_channel_hkey(new_vc.id)
|
||||||
|
self.redis.hset(chkey, "guild", new_vc.guild.id)
|
||||||
|
self.redis.hset(chkey, "owner", member.id)
|
||||||
|
|
||||||
|
return new_vc
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def avc_cleanup(self):
|
||||||
|
"""
|
||||||
|
Runs through the REDIS server to find voice channels to delete.
|
||||||
|
|
||||||
|
Should be run frequently.
|
||||||
|
|
||||||
|
Can't imagine it needs a return for now.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async for key in self.redis.scan_iter(match="avc:c:*"):
|
||||||
|
vc_id = key.split(":")[2]
|
||||||
|
guild_id = self.redis.hget(key, "guild")
|
||||||
|
vc = self.bot.channel_by_id(guild_id, vc_id)
|
||||||
|
|
||||||
|
if len(vc.members) == 0:
|
||||||
|
vc.delete(reason="Empty. AVC Cleanup")
|
||||||
|
self.redis.hdel(key)
|
||||||
|
|
||||||
|
|
||||||
|
async def avc_monitor(self, guild_id: int, channel_id: int):
|
||||||
|
# Have to do this for redis...
|
||||||
|
await self.setup()
|
||||||
|
|
||||||
|
# Setup vars we need
|
||||||
|
channel = await self.channel_by_id(guild_id, channel_id)
|
||||||
|
phkey = self.avc_parent_hkey(channel_id)
|
||||||
|
|
||||||
|
# Check if we're about to monitor a VC just incase.
|
||||||
|
# Cog should check this anyway.
|
||||||
|
if not isinstance(channel, discord.VoiceChannel):
|
||||||
|
return False
|
||||||
|
|
||||||
|
# hset returns True if field is new or changed, else False
|
||||||
|
result = await self.redis.hset(phkey, "guild", guild_id)
|
||||||
|
if result:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
86
src/actions/core.py
Normal file
86
src/actions/core.py
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
import re
|
||||||
|
import discord
|
||||||
|
from helpers.redis import RedisHelper
|
||||||
|
|
||||||
|
|
||||||
|
class ActionCore:
|
||||||
|
def __init__(self, bot: discord.Client): # Use discord.Client or commands.Bot for compatibility
|
||||||
|
self.redis_helper = RedisHelper()
|
||||||
|
self.redis = None
|
||||||
|
self.bot = bot
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
self.redis = await self.redis_helper.data_connection()
|
||||||
|
if not self.redis:
|
||||||
|
print("Warning: Failed to establish Redis data connection for ActionCore.")
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc, tb):
|
||||||
|
await self.redis_helper.__aexit__(exc_type, exc, tb)
|
||||||
|
self.redis = None
|
||||||
|
|
||||||
|
async def setup(self):
|
||||||
|
self.redis = await self.redis_helper.data_connection()
|
||||||
|
if not self.redis:
|
||||||
|
print("Warning: Failed to establish Redis data connection for ActionCore.")
|
||||||
|
|
||||||
|
def guild_by_id(self, guild_id: int) -> discord.Guild | None:
|
||||||
|
return self.bot.get_guild(guild_id)
|
||||||
|
|
||||||
|
async def member_by_id(self, guild_id: int, user_id: int) -> discord.Member | None:
|
||||||
|
guild = self.guild_by_id(guild_id)
|
||||||
|
if guild is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try get from cache
|
||||||
|
member = guild.get_member(user_id)
|
||||||
|
if member:
|
||||||
|
return member
|
||||||
|
|
||||||
|
# Fallback to API lookup
|
||||||
|
try:
|
||||||
|
return await guild.fetch_member(user_id)
|
||||||
|
except discord.NotFound:
|
||||||
|
print(f"User {user_id} not found in guild {guild_id}")
|
||||||
|
except discord.Forbidden:
|
||||||
|
print(f"Missing permissions to fetch member in guild {guild_id}")
|
||||||
|
except discord.HTTPException as e:
|
||||||
|
print(f"HTTP error while fetching member: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def channel_by_id(self, guild_id: int, channel_id: int) -> discord.abc.GuildChannel | None:
|
||||||
|
guild = self.guild_by_id(guild_id)
|
||||||
|
if guild is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try cache
|
||||||
|
channel = guild.get_channel(channel_id)
|
||||||
|
if channel:
|
||||||
|
return channel
|
||||||
|
|
||||||
|
# Fallback to API
|
||||||
|
try:
|
||||||
|
return await self.bot.fetch_channel(channel_id)
|
||||||
|
except discord.NotFound:
|
||||||
|
print(f"Channel {channel_id} not found.")
|
||||||
|
except discord.Forbidden:
|
||||||
|
print("Missing permissions to fetch channel.")
|
||||||
|
except discord.HTTPException as e:
|
||||||
|
print(f"HTTP error while fetching channel: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def remove_mentions(self, text: str) -> str:
|
||||||
|
# Patterns for user, role mentions and @everyone/@here
|
||||||
|
mention_patterns = [
|
||||||
|
r"<@!?\d+>", # user mentions, with optional '!' for nickname
|
||||||
|
r"<@&\d+>", # role mentions
|
||||||
|
r"@everyone", # everyone mention
|
||||||
|
r"@here" # here mention
|
||||||
|
]
|
||||||
|
|
||||||
|
pattern = "|".join(mention_patterns)
|
||||||
|
cleaned = re.sub(pattern, "", text, flags=re.IGNORECASE)
|
||||||
|
return cleaned.strip()
|
69
src/actions/guild.py
Normal file
69
src/actions/guild.py
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
import discord
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
async def guild_create(bot: discord.Client, guild_name: str, user_id: int):
|
||||||
|
"""
|
||||||
|
Creates a guild and sends an invite link to the specified user via DM.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bot: The bot instance.
|
||||||
|
guild_name: Name of the new guild.
|
||||||
|
user_id: Discord user ID to DM the invite to.
|
||||||
|
"""
|
||||||
|
# Create the guild
|
||||||
|
guild = await bot.create_guild(name=guild_name)
|
||||||
|
|
||||||
|
# Wait for guild to be fully available in cache (adjust time if needed)
|
||||||
|
for _ in range(10):
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
if bot.get_guild(guild.id):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Try to find a text channel where we can create an invite
|
||||||
|
for channel in guild.text_channels:
|
||||||
|
if channel.permissions_for(guild.me).create_instant_invite:
|
||||||
|
invite = await channel.create_invite(max_age=3600, max_uses=1, unique=True)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise RuntimeError("No channel found with permission to create invites.")
|
||||||
|
|
||||||
|
# Send invite via DM
|
||||||
|
user = await bot.fetch_user(user_id)
|
||||||
|
try:
|
||||||
|
await user.send(f"Here is your invite to **{guild_name}**: {invite.url}")
|
||||||
|
except discord.Forbidden:
|
||||||
|
raise RuntimeError("Unable to send DM to user.")
|
||||||
|
|
||||||
|
return guild
|
||||||
|
|
||||||
|
|
||||||
|
async def create_guild_invite(bot, guild_id):
|
||||||
|
"""
|
||||||
|
Looks for the first publicly available text channel and provides an invite to it
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bot: The bot instance.
|
||||||
|
guild_id: The guild ID to search. Bot must be inside of it.
|
||||||
|
"""
|
||||||
|
guild = bot.get_guild(guild_id)
|
||||||
|
if guild is None:
|
||||||
|
# Optionally fetch if not cached
|
||||||
|
try:
|
||||||
|
guild = await bot.fetch_guild(guild_id)
|
||||||
|
except Exception:
|
||||||
|
return None # Guild not found
|
||||||
|
|
||||||
|
# Iterate over channels in guild in order and find first channel
|
||||||
|
# where bot has permission to create invite and members can view it
|
||||||
|
for channel in guild.channels:
|
||||||
|
# Check if channel is text or voice and bot has create invite perms
|
||||||
|
if hasattr(channel, "permissions_for"):
|
||||||
|
perms = channel.permissions_for(guild.me)
|
||||||
|
if perms.create_instant_invite and perms.view_channel:
|
||||||
|
try:
|
||||||
|
invite = await channel.create_invite(max_age=3600, max_uses=1, unique=True)
|
||||||
|
return invite.url
|
||||||
|
except Exception:
|
||||||
|
# Could not create invite in this channel, try next
|
||||||
|
continue
|
||||||
|
return None
|
37
src/actions/roles.py
Normal file
37
src/actions/roles.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
import discord
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
async def guild_create(bot: discord.Client, guild_name: str, user_id: int):
|
||||||
|
"""
|
||||||
|
Creates a guild and sends an invite link to the specified user via DM.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bot: The bot instance.
|
||||||
|
guild_name: Name of the new guild.
|
||||||
|
user_id: Discord user ID to DM the invite to.
|
||||||
|
"""
|
||||||
|
# Create the guild
|
||||||
|
guild = await bot.create_guild(name=guild_name)
|
||||||
|
|
||||||
|
# Wait for guild to be fully available in cache (adjust time if needed)
|
||||||
|
for _ in range(10):
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
if bot.get_guild(guild.id):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Try to find a text channel where we can create an invite
|
||||||
|
for channel in guild.text_channels:
|
||||||
|
if channel.permissions_for(guild.me).create_instant_invite:
|
||||||
|
invite = await channel.create_invite(max_age=3600, max_uses=1, unique=True)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise RuntimeError("No channel found with permission to create invites.")
|
||||||
|
|
||||||
|
# Send invite via DM
|
||||||
|
user = await bot.fetch_user(user_id)
|
||||||
|
try:
|
||||||
|
await user.send(f"Here is your invite to **{guild_name}**: {invite.url}")
|
||||||
|
except discord.Forbidden:
|
||||||
|
raise RuntimeError("Unable to send DM to user.")
|
||||||
|
|
||||||
|
return guild
|
95
src/bot.py
Normal file
95
src/bot.py
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
# General libraries needed
|
||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
# Discord libraries needed
|
||||||
|
import discord
|
||||||
|
from discord.ext import commands
|
||||||
|
|
||||||
|
# Helper that identifies intents
|
||||||
|
from helpers.intent import match_intent
|
||||||
|
|
||||||
|
# Helper for logging
|
||||||
|
from helpers.logger import get_logger
|
||||||
|
|
||||||
|
# Initiate logger
|
||||||
|
logger = get_logger("bxt")
|
||||||
|
|
||||||
|
class DiscordBot:
|
||||||
|
def __init__(self):
|
||||||
|
# Load logger
|
||||||
|
self.logger = logger
|
||||||
|
self.logger.info("🚀 Logger successfully initialized")
|
||||||
|
|
||||||
|
# Get log level desired
|
||||||
|
self.LOG_LEVEL = os.environ.get('LOG_LEVEL', 'DEBUG').upper()
|
||||||
|
|
||||||
|
self.DISCORD_TOKEN = os.environ.get('DISCORD_TOKEN')
|
||||||
|
self.REDIS_SERVER = os.environ.get('REDIS_SERVER', 'localhost')
|
||||||
|
self.REDIS_PORT = os.environ.get('REDIS_PORT', '6379')
|
||||||
|
self.REDIS_CONF_DB = os.environ.get('REDIS_CONF_DB', '1')
|
||||||
|
self.REDIS_DATA_DB = os.environ.get('REDIS_DATA_DB', '0')
|
||||||
|
|
||||||
|
intents = discord.Intents.default()
|
||||||
|
intents.message_content = True
|
||||||
|
intents.guilds = True
|
||||||
|
intents.members = True
|
||||||
|
intents.voice_states = True
|
||||||
|
|
||||||
|
self.bot = commands.Bot(command_prefix="!", intents=intents)
|
||||||
|
|
||||||
|
# Register events
|
||||||
|
self.bot.event(self.on_message)
|
||||||
|
self.bot.event(self.on_ready)
|
||||||
|
|
||||||
|
async def on_message(self, message):
|
||||||
|
if message.author == self.bot.user:
|
||||||
|
return
|
||||||
|
|
||||||
|
bot_mention = f"<@{self.bot.user.id}>"
|
||||||
|
content = message.content.strip()
|
||||||
|
|
||||||
|
# Only handle if in guild and message mentions bot at start or end,
|
||||||
|
# or if it's a DM (no guild)
|
||||||
|
if message.guild and not (content.startswith(bot_mention) or content.endswith(bot_mention)):
|
||||||
|
return
|
||||||
|
|
||||||
|
# Remove bot mention from content
|
||||||
|
content = content.replace(bot_mention, "").strip()
|
||||||
|
|
||||||
|
# Split message content into lines and process each
|
||||||
|
lines = [line.strip() for line in content.split('\n') if line.strip()]
|
||||||
|
|
||||||
|
ctx = await self.bot.get_context(message)
|
||||||
|
|
||||||
|
handled_any = False
|
||||||
|
for line in lines:
|
||||||
|
func, doc = match_intent(line)
|
||||||
|
if func:
|
||||||
|
handled_any = True
|
||||||
|
try:
|
||||||
|
await func(ctx, doc)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.debug(f"Error running intent handler: {e}")
|
||||||
|
else:
|
||||||
|
await message.channel.send(f"Sorry, I didn't understand that: `{line}`")
|
||||||
|
|
||||||
|
await self.bot.process_commands(message)
|
||||||
|
|
||||||
|
async def on_ready(self):
|
||||||
|
self.logger.info(f"Logged in as {self.bot.user} (ID: {self.bot.user.id})")
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
# Load cogs (adjust the names/paths to your setup)
|
||||||
|
await self.bot.load_extension("cogs.cogcontrol")
|
||||||
|
await self.bot.load_extension("cogs.admin")
|
||||||
|
await self.bot.load_extension("cogs.guild")
|
||||||
|
await self.bot.load_extension("cogs.autovoicechannel")
|
||||||
|
|
||||||
|
await self.bot.start(self.DISCORD_TOKEN)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
discord_bot = DiscordBot()
|
||||||
|
asyncio.run(discord_bot.start())
|
||||||
|
|
132
src/cogs/admin.py
Normal file
132
src/cogs/admin.py
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
# cogs/admin.py
|
||||||
|
|
||||||
|
# Helpers for intent matching
|
||||||
|
from helpers.intent import intent
|
||||||
|
from helpers.intent_cog import IntentCog
|
||||||
|
|
||||||
|
# RE for stripping phrasing out
|
||||||
|
import re
|
||||||
|
|
||||||
|
class AdminCog(IntentCog):
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
|
||||||
|
# ------------
|
||||||
|
# Help Message
|
||||||
|
# ------------
|
||||||
|
|
||||||
|
@intent([["admin"], ["help"]])
|
||||||
|
async def guild_help(self, ctx, doc):
|
||||||
|
await ctx.send("Full help is not really possible at this stage. I'll try to understand what you're saying and match it to what you're wanting to do.")
|
||||||
|
lines = []
|
||||||
|
for i, intent in enumerate(list_intents(), start=1):
|
||||||
|
pattern_str = " + ".join(f"[{', '.join(group)}]" for group in intent["pattern"])
|
||||||
|
lines.append(f"{i}. `{intent['func_name']}` from **{intent['cog']}** → {pattern_str}")
|
||||||
|
|
||||||
|
message = "\n".join(lines) or "No intents registered."
|
||||||
|
await ctx.send(f"**Registered Intents:**\n{message}")
|
||||||
|
|
||||||
|
# ------------
|
||||||
|
|
||||||
|
@intent([["kick", "remove"], ["user", "member"]])
|
||||||
|
async def user_kick(self, ctx, doc):
|
||||||
|
# Expected phrase to match
|
||||||
|
# kick user @mention
|
||||||
|
if ctx.guild == None:
|
||||||
|
await ctx.send("I can't kick users from DMs")
|
||||||
|
return
|
||||||
|
|
||||||
|
targets = [m for m in ctx.message.mentions if m.id != ctx.bot.user.id]
|
||||||
|
if targets:
|
||||||
|
for target in targets:
|
||||||
|
await target.kick()
|
||||||
|
await ctx.send(f"I've kicked: {' '.join(target.mention for target in targets)}")
|
||||||
|
else:
|
||||||
|
await ctx.send("You need to @mention someone to kick them.")
|
||||||
|
|
||||||
|
@intent([["ban"], ["user", "member"]])
|
||||||
|
async def user_ban(self, ctx, doc):
|
||||||
|
# Expected phrase to match
|
||||||
|
# ban user @mention
|
||||||
|
if ctx.guild == None:
|
||||||
|
await ctx.send("I can't ban users from DMs")
|
||||||
|
return
|
||||||
|
|
||||||
|
targets = [m for m in ctx.message.mentions if m.id != ctx.bot.user.id]
|
||||||
|
if targets:
|
||||||
|
for target in targets:
|
||||||
|
await target.ban()
|
||||||
|
await ctx.send(f"I've banned: {' '.join(target.mention for target in targets)}")
|
||||||
|
else:
|
||||||
|
await ctx.send("You need to @mention someone to ban them.")
|
||||||
|
|
||||||
|
@intent([["unban"], ["user", "member"]])
|
||||||
|
async def user_unban(self, ctx, doc):
|
||||||
|
output = []
|
||||||
|
|
||||||
|
if ctx.guild is None:
|
||||||
|
await ctx.send("I can't unban users from DMs. You have to do it within your guild, preferably in your staff chat.")
|
||||||
|
return
|
||||||
|
|
||||||
|
user_ids = re.findall(r'\b\d+\b', ctx.message.content)
|
||||||
|
if not user_ids:
|
||||||
|
output.append("I couldn't find any user IDs in your message.")
|
||||||
|
else:
|
||||||
|
banned_users = {}
|
||||||
|
async for ban_entry in ctx.guild.bans():
|
||||||
|
banned_users[ban_entry.user.id] = ban_entry.user
|
||||||
|
|
||||||
|
for user_id in user_ids:
|
||||||
|
user_id = int(user_id)
|
||||||
|
if user_id not in banned_users:
|
||||||
|
output.append(f"User ID {user_id} is not currently banned.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
user = banned_users[user_id]
|
||||||
|
try:
|
||||||
|
await ctx.guild.unban(user)
|
||||||
|
output.append(f"Unbanned user: {user.name}")
|
||||||
|
except Exception as e:
|
||||||
|
output.append(f"Failed to unban user {user_id}: {e}")
|
||||||
|
|
||||||
|
await ctx.send("\n".join(output))
|
||||||
|
|
||||||
|
|
||||||
|
@intent([["give", "grant"], ["role"]])
|
||||||
|
async def user_give_role(self, ctx, doc):
|
||||||
|
# Expected phrase to match:
|
||||||
|
# give role 123 to user @mention
|
||||||
|
output = []
|
||||||
|
|
||||||
|
if ctx.guild == None:
|
||||||
|
output.append("There aren't any roles in DMs")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Extracts a single role ID succeeding phrase "id [0-9]+"
|
||||||
|
role_match = re.search(r'\b(?:id\s+)?(\d{3,})\b', ctx.message.content)
|
||||||
|
role_id = int(role_match.group(1)) if role_match else None
|
||||||
|
|
||||||
|
# Extracts the mention of a single user following the phrase "user @user"
|
||||||
|
user_match = re.search(r'\bto\s+(?:user\s+)?(<@!?\d+>)', ctx.message.content)
|
||||||
|
user_id = user_match.group(1) if user_match else None
|
||||||
|
|
||||||
|
if not role_id:
|
||||||
|
output.append("Could not find a role ID in your message")
|
||||||
|
else:
|
||||||
|
output.append(f"Role detected: {role_id}")
|
||||||
|
|
||||||
|
if not user_id:
|
||||||
|
output.append("Could not find a user mention in your message")
|
||||||
|
else:
|
||||||
|
output.append(f"User detected: {user_id}")
|
||||||
|
|
||||||
|
output.append("Command completed")
|
||||||
|
|
||||||
|
await ctx.send(output)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
await bot.add_cog(AdminCog(bot))
|
||||||
|
|
104
src/cogs/autovoicechannel.py
Normal file
104
src/cogs/autovoicechannel.py
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
# Discord libraries
|
||||||
|
import discord
|
||||||
|
from discord.ext import commands
|
||||||
|
|
||||||
|
# Helpers for NLP intent declaration
|
||||||
|
from helpers.intent import intent, list_intents
|
||||||
|
from helpers.intent_cog import IntentCog
|
||||||
|
|
||||||
|
# Helper for logging
|
||||||
|
import uuid
|
||||||
|
import helpers.logger as log_helper
|
||||||
|
|
||||||
|
# Action class to centralize actions the bot can take
|
||||||
|
from actions.avc import ActionAVC
|
||||||
|
|
||||||
|
# Finally, class!
|
||||||
|
class AutoVoiceChannelCog(IntentCog):
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
self.OWNERS = [214577944570494976, 1088598275562668195]
|
||||||
|
self.action = ActionAVC(bot)
|
||||||
|
self.logger = log_helper.get_logger(__name__)
|
||||||
|
self.logger.debug("AutoVoiceChannelCog initialized")
|
||||||
|
|
||||||
|
@commands.Cog.listener()
|
||||||
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
||||||
|
"""
|
||||||
|
Listens for a change in voice state. If a user is joining a new server and that server is considered "monitored",
|
||||||
|
we'll create a new VC for that person.
|
||||||
|
"""
|
||||||
|
# Creates UUID to make log help
|
||||||
|
CMDID = uuid.uuid1()
|
||||||
|
|
||||||
|
self.logger.debug(f"[{CMDID}] avc on_voice_state_update triggered")
|
||||||
|
# Detect if user has left a channel
|
||||||
|
if after.channel is None:
|
||||||
|
self.logger.debug(f"[{CMDID}] {member.name} disconnected from a channel.")
|
||||||
|
self.logger.debug(f"[{CMDID}] Running clean up just to be an over achiever.")
|
||||||
|
await self.action.avc_cleanup()
|
||||||
|
|
||||||
|
# Detect if user has joined a new channel
|
||||||
|
if after.channel is not None:
|
||||||
|
self.logger.debug(f"[{CMDID}] User appears to be present in a channel.")
|
||||||
|
if before.channel is not None:
|
||||||
|
self.logger.debug(f"[{CMDID}] before was set, so treating this carefully.")
|
||||||
|
if after.channel.id != before.channel.id:
|
||||||
|
self.logger.debug(f"[{CMDID}] User appears to have moved VC and not just muted etc.")
|
||||||
|
if await self.action.avc_ismon(after.channel):
|
||||||
|
await self.action.avc_create(member, before, after)
|
||||||
|
|
||||||
|
if before.channel is None:
|
||||||
|
self.logger.debug(f"[{CMDID}] User appears to have newly joined a VC")
|
||||||
|
if await self.action.avc_ismon(after.channel):
|
||||||
|
await self.action.avc_create(member, before, after)
|
||||||
|
|
||||||
|
@intent([["avc"], ["monitor"]])
|
||||||
|
async def avc_monitor_intent(self, ctx, doc):
|
||||||
|
"""
|
||||||
|
Expected phrase: avc monitor <channel_id>
|
||||||
|
or: avc monitor 1234
|
||||||
|
|
||||||
|
Will check to make sure that the person is a guild admin of some sort.
|
||||||
|
"""
|
||||||
|
UUID = log_helper.get_uuid()
|
||||||
|
self.logger.debug(f"[{UUID}] avc_monitor_intent called with message: {doc}")
|
||||||
|
|
||||||
|
if not ctx.guild:
|
||||||
|
await ctx.reply("This has to be run inside of a guild.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not ctx.author.guild_permissions.administrator:
|
||||||
|
await ctx.reply("You are not an admin in this Discord.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.logger.debug(f"[{UUID}] checked command is run in a guild and author has admin to the guild.")
|
||||||
|
|
||||||
|
# Extract channel ID
|
||||||
|
channel_id = self.action.remove_mentions(doc.text)
|
||||||
|
channel_id = int(' '.join(channel_id.split()[2:]))
|
||||||
|
self.logger.debug(f"[{UUID}] Extracted {channel_id} as the channel ID")
|
||||||
|
|
||||||
|
# Combine a channel for ZERO reason
|
||||||
|
channel = await self.action.channel_by_id(ctx.guild.id, int(channel_id))
|
||||||
|
self.logger.debug(f"[{UUID}] Found {channel.name} ({channel.id}) in {ctx.guild.name} ({ctx.guild.id})")
|
||||||
|
|
||||||
|
# Check to see if we're already monitoring it
|
||||||
|
if await self.action.avc_ismon(channel):
|
||||||
|
await ctx.reply("I'm already monitoring that channel.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if isinstance(channel, discord.VoiceChannel):
|
||||||
|
if await self.action.avc_monitor(ctx.guild.id, channel.id):
|
||||||
|
await ctx.reply(f"I'm now monitoring {channel.name}")
|
||||||
|
else:
|
||||||
|
await ctx.reply(f"I could not monitor {channel.name}. Try again later")
|
||||||
|
else:
|
||||||
|
await ctx.reply(f"{channel.name} is not a voice channel.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
await bot.add_cog(AutoVoiceChannelCog(bot))
|
||||||
|
|
88
src/cogs/cogcontrol.py
Normal file
88
src/cogs/cogcontrol.py
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
# Helpers for NLP intent declaration
|
||||||
|
from helpers.intent import intent, list_intents
|
||||||
|
from helpers.intent_cog import IntentCog
|
||||||
|
|
||||||
|
# Helper for logging
|
||||||
|
import helpers.logger as log_helper
|
||||||
|
|
||||||
|
class ControlCog(IntentCog):
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
self.OWNERS = [214577944570494976, 1088598275562668195]
|
||||||
|
self.logger = log_helper.get_logger(__name__)
|
||||||
|
|
||||||
|
@intent([["cog"], ["reload"]])
|
||||||
|
async def cog_reload_intent(self, ctx, doc):
|
||||||
|
# Set UUID for this operation
|
||||||
|
UUID = log_helper.get_uuid()
|
||||||
|
|
||||||
|
self.logger.debug(f"[{UUID}] Cog load called. Command: {doc}")
|
||||||
|
|
||||||
|
if ctx.author.id not in self.OWNERS:
|
||||||
|
await ctx.reply('Sorry but you are not authorized')
|
||||||
|
return
|
||||||
|
|
||||||
|
words = ctx.message.content.split()
|
||||||
|
cog_name = " ".join(words[2:]).strip()
|
||||||
|
cog_path = f"cogs.{cog_name}"
|
||||||
|
|
||||||
|
if cog_path in self.bot.extensions:
|
||||||
|
try:
|
||||||
|
await self.bot.reload_extension(cog_path)
|
||||||
|
await ctx.reply(f"Reloaded {cog_name}")
|
||||||
|
except Exception as e:
|
||||||
|
await ctx.reply(f"Error reloading {cog_name}: {e}")
|
||||||
|
else:
|
||||||
|
await ctx.reply(f"Cog {cog_name} is not loaded")
|
||||||
|
|
||||||
|
@intent([["cog"], ["load"]])
|
||||||
|
async def cog_load_intent(self, ctx, doc):
|
||||||
|
# Set UUID for this operation
|
||||||
|
UUID = log_helper.get_uuid()
|
||||||
|
|
||||||
|
self.logger.debug(f"[{UUID}] Cog load called. Command: {doc}")
|
||||||
|
|
||||||
|
if ctx.author.id not in self.OWNERS:
|
||||||
|
await ctx.reply('Sorry but you are not authorized')
|
||||||
|
return
|
||||||
|
|
||||||
|
words = ctx.message.content.split()
|
||||||
|
cog_name = " ".join(words[2:]).strip()
|
||||||
|
cog_path = f"cogs.{cog_name}"
|
||||||
|
|
||||||
|
if cog_path not in self.bot.extensions:
|
||||||
|
try:
|
||||||
|
await self.bot.load_extension(cog_path)
|
||||||
|
await ctx.reply(f"Loaded {cog_name}")
|
||||||
|
except Exception as e:
|
||||||
|
await ctx.reply(f"Error loading {cog_name}: {e}")
|
||||||
|
else:
|
||||||
|
await ctx.reply(f"Cog {cog_name} is already loaded")
|
||||||
|
|
||||||
|
@intent([["cog"], ["unload"]])
|
||||||
|
async def cog_unload_intent(self, ctx, doc):
|
||||||
|
# Set UUID for this operation
|
||||||
|
UUID = log_helper.get_uuid()
|
||||||
|
|
||||||
|
self.logger.debug(f"[{UUID}] Cog load called. Command: {doc}")
|
||||||
|
|
||||||
|
if ctx.author.id not in self.OWNERS:
|
||||||
|
await ctx.reply('Sorry but you are not authorized')
|
||||||
|
return
|
||||||
|
|
||||||
|
words = ctx.message.content.split()
|
||||||
|
cog_name = " ".join(words[2:]).strip()
|
||||||
|
cog_path = f"cogs.{cog_name}"
|
||||||
|
|
||||||
|
if cog_path in self.bot.extensions:
|
||||||
|
try:
|
||||||
|
await self.bot.unload_extension(cog_path)
|
||||||
|
await ctx.reply(f"Unloaded {cog_name}")
|
||||||
|
except Exception as e:
|
||||||
|
await ctx.reply(f"Error unloading {cog_name}: {e}")
|
||||||
|
else:
|
||||||
|
await ctx.reply(f"Cog {cog_name} is not loaded")
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
await bot.add_cog(ControlCog(bot))
|
||||||
|
|
36
src/cogs/guild.py
Normal file
36
src/cogs/guild.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
# Helpers for NLP intent declaration
|
||||||
|
from helpers.intent import intent, list_intents
|
||||||
|
from helpers.intent_cog import IntentCog
|
||||||
|
|
||||||
|
# Actions that intents can trigger
|
||||||
|
import actions.guild as guild_actions
|
||||||
|
|
||||||
|
|
||||||
|
class GuildCog(IntentCog):
|
||||||
|
def __init__(self, bot):
|
||||||
|
self.bot = bot
|
||||||
|
|
||||||
|
@intent([["guild"], ["help"]])
|
||||||
|
async def guild_help_intent(self, ctx, doc):
|
||||||
|
await ctx.send("Full help is not really possible at this stage. I'll try to understand what you're saying and match it to what you're wanting to do.")
|
||||||
|
lines = []
|
||||||
|
for i, intent in enumerate(list_intents(), start=1):
|
||||||
|
pattern_str = " + ".join(f"[{', '.join(group)}]" for group in intent["pattern"])
|
||||||
|
lines.append(f"{i}. `{intent['func_name']}` from **{intent['cog']}** → {pattern_str}")
|
||||||
|
|
||||||
|
message = "\n".join(lines) or "No intents registered."
|
||||||
|
await ctx.send(f"**Registered Intents:**\n{message}")
|
||||||
|
|
||||||
|
@intent([["guild"], ["create"]])
|
||||||
|
async def guild_create_intent(self, ctx, doc):
|
||||||
|
# Expected phrase: create guild "asd"
|
||||||
|
# or: guild create "asd"
|
||||||
|
# Will not work for anyone but kpro or PM
|
||||||
|
OWNERS=['214577944570494976','1088598275562668195']
|
||||||
|
OWNERS=['214577944570494976']
|
||||||
|
if ctx.author.id not in OWNERS:
|
||||||
|
await ctx.reply('Sorry but you are not authorized')
|
||||||
|
|
||||||
|
async def setup(bot):
|
||||||
|
await bot.add_cog(GuildCog(bot))
|
||||||
|
|
43
src/helpers/intent.py
Normal file
43
src/helpers/intent.py
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
import spacy
|
||||||
|
|
||||||
|
# Helper for logging
|
||||||
|
import uuid
|
||||||
|
from helpers.logger import get_logger
|
||||||
|
|
||||||
|
# Initiate logger
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
intent_handlers = []
|
||||||
|
nlp = spacy.load("en_core_web_sm")
|
||||||
|
|
||||||
|
def intent(keyword_groups):
|
||||||
|
def decorator(func):
|
||||||
|
func._intent_pattern = keyword_groups # Mark function with pattern
|
||||||
|
return func
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
def match_intent(text):
|
||||||
|
doc = nlp(text.lower())
|
||||||
|
#tokens = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct]
|
||||||
|
tokens = [token.lemma_ for token in doc if not token.is_punct]
|
||||||
|
print(f"Tokens: {tokens}") # DEBUG
|
||||||
|
|
||||||
|
for handler in intent_handlers:
|
||||||
|
pattern = handler["pattern"]
|
||||||
|
if all(any(kw in tokens for kw in group) for group in pattern):
|
||||||
|
print(f"Matched intent: {handler['func'].__name__}") # DEBUG
|
||||||
|
return handler["func"], doc
|
||||||
|
print("No intent matched") # DEBUG
|
||||||
|
return None, doc
|
||||||
|
|
||||||
|
|
||||||
|
def list_intents():
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"pattern": handler["pattern"],
|
||||||
|
"func_name": handler["func"].__name__,
|
||||||
|
"cog": handler["func"].__self__.__class__.__name__ if hasattr(handler["func"], "__self__") else None,
|
||||||
|
}
|
||||||
|
for handler in intent_handlers
|
||||||
|
]
|
||||||
|
|
16
src/helpers/intent_cog.py
Normal file
16
src/helpers/intent_cog.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
from discord.ext import commands
|
||||||
|
from helpers.intent import intent_handlers
|
||||||
|
|
||||||
|
class IntentCog(commands.Cog):
|
||||||
|
def cog_load(self):
|
||||||
|
# Clear existing handlers for this cog, if you want to avoid duplicates:
|
||||||
|
# intent_handlers[:] = [h for h in intent_handlers if h.get("func").__self__ != self]
|
||||||
|
|
||||||
|
for attr_name in dir(self):
|
||||||
|
attr = getattr(self, attr_name)
|
||||||
|
if callable(attr) and hasattr(attr, "_intent_pattern"):
|
||||||
|
intent_handlers.append({
|
||||||
|
"pattern": attr._intent_pattern,
|
||||||
|
"func": attr, # Bound method (includes self)
|
||||||
|
})
|
||||||
|
|
27
src/helpers/logger.py
Normal file
27
src/helpers/logger.py
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
# helpers/logger.py
|
||||||
|
import os
|
||||||
|
import uuid
|
||||||
|
import logging
|
||||||
|
|
||||||
|
def get_logger(name="discord_bot"):
|
||||||
|
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||||||
|
level = getattr(logging, log_level, logging.INFO)
|
||||||
|
|
||||||
|
logger = logging.getLogger(name)
|
||||||
|
logger.setLevel(level)
|
||||||
|
|
||||||
|
if not logger.handlers:
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setLevel(level)
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
logger.debug("Logger initialized")
|
||||||
|
return logger
|
||||||
|
|
||||||
|
|
||||||
|
def get_uuid():
|
||||||
|
return uuid.uuid1()
|
57
src/helpers/redis.py
Normal file
57
src/helpers/redis.py
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
import os
|
||||||
|
import redis.asyncio as aioredis # or 'import aioredis' if using legacy version
|
||||||
|
|
||||||
|
|
||||||
|
class RedisHelper:
|
||||||
|
def __init__(self):
|
||||||
|
self.REDIS_SERVER = os.environ.get('REDIS_SERVER', 'localhost')
|
||||||
|
self.REDIS_PORT = int(os.environ.get('REDIS_PORT', '6379'))
|
||||||
|
self.REDIS_CONF_DB = int(os.environ.get('REDIS_CONF_DB', '1'))
|
||||||
|
self.REDIS_DATA_DB = int(os.environ.get('REDIS_DATA_DB', '0'))
|
||||||
|
|
||||||
|
self._data_connection = None
|
||||||
|
self._conf_connection = None
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
# Preload both connections for use inside async with block
|
||||||
|
await self.data_connection()
|
||||||
|
await self.conf_connection()
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc, tb):
|
||||||
|
if self._data_connection:
|
||||||
|
await self._data_connection.close()
|
||||||
|
self._data_connection = None
|
||||||
|
|
||||||
|
if self._conf_connection:
|
||||||
|
await self._conf_connection.close()
|
||||||
|
self._conf_connection = None
|
||||||
|
|
||||||
|
async def data_connection(self):
|
||||||
|
if self._data_connection is None:
|
||||||
|
try:
|
||||||
|
self._data_connection = aioredis.Redis.from_url(
|
||||||
|
f"redis://{self.REDIS_SERVER}:{self.REDIS_PORT}/{self.REDIS_DATA_DB}",
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
await self._data_connection.ping()
|
||||||
|
print("Redis data connection established successfully.")
|
||||||
|
except aioredis.RedisError as e:
|
||||||
|
print(f"Error connecting to Redis data DB: {e}")
|
||||||
|
self._data_connection = None
|
||||||
|
return self._data_connection
|
||||||
|
|
||||||
|
async def conf_connection(self):
|
||||||
|
if self._conf_connection is None:
|
||||||
|
try:
|
||||||
|
self._conf_connection = aioredis.Redis.from_url(
|
||||||
|
f"redis://{self.REDIS_SERVER}:{self.REDIS_PORT}/{self.REDIS_CONF_DB}",
|
||||||
|
decode_responses=True
|
||||||
|
)
|
||||||
|
await self._conf_connection.ping()
|
||||||
|
print("Redis config connection established successfully.")
|
||||||
|
except aioredis.RedisError as e:
|
||||||
|
print(f"Error connecting to Redis conf DB: {e}")
|
||||||
|
self._conf_connection = None
|
||||||
|
return self._conf_connection
|
||||||
|
|
3
src/requirements.txt
Normal file
3
src/requirements.txt
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
spacy
|
||||||
|
redis
|
||||||
|
discord.py[voice]
|
41
src/testing.py
Normal file
41
src/testing.py
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
import spacy
|
||||||
|
|
||||||
|
nlp = spacy.load("en_core_web_sm")
|
||||||
|
|
||||||
|
# Define intent keywords
|
||||||
|
INTENTS = {
|
||||||
|
"create_guild": [["create", "make", "new"], ["guild"]],
|
||||||
|
"list_guilds": [["list", "show", "display"], ["guild"]],
|
||||||
|
}
|
||||||
|
|
||||||
|
def detect_intent(text):
|
||||||
|
doc = nlp(text.lower())
|
||||||
|
tokens = [token.lemma_ for token in doc if not token.is_stop]
|
||||||
|
|
||||||
|
for intent, keyword_groups in INTENTS.items():
|
||||||
|
# Check if any keyword in each group is present
|
||||||
|
if all(any(kw in tokens for kw in group) for group in keyword_groups):
|
||||||
|
return intent
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
def analyze(text):
|
||||||
|
doc = nlp(text.lower())
|
||||||
|
|
||||||
|
print("\nChecking for stop words")
|
||||||
|
for token in doc:
|
||||||
|
print(f"{token.text} (is_stop={token.is_stop})")
|
||||||
|
|
||||||
|
print("\nChecking for punctuation")
|
||||||
|
for token in doc:
|
||||||
|
print(f"{token.text} (is_punct={token.is_punct})")
|
||||||
|
|
||||||
|
print("\nChecking intent")
|
||||||
|
intent = detect_intent(text)
|
||||||
|
print(f"{text} ({intent})")
|
||||||
|
|
||||||
|
#print(detect_intent("list guilds"))
|
||||||
|
analyze("list guilds")
|
||||||
|
|
||||||
|
print("give" in nlp.Defaults.stop_words)
|
||||||
|
print(nlp("give")[0].is_stop)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue