Compare commits

3 Commits

Author SHA1 Message Date
James H
39eeedc5f9 Fix typing for py39 2023-04-08 00:52:45 +01:00
James H
2d5a58f996 Audio, TTS 2023-04-08 00:35:31 +01:00
James H
0f3416c11b Add initial code 2021-11-29 23:13:52 +00:00
10 changed files with 529 additions and 0 deletions

3
.gitignore vendored
View File

@@ -121,3 +121,6 @@ dmypy.json
# Bot # Bot
token.txt token.txt
sounds/
tmp/
tts/

8
config.json Normal file
View File

@@ -0,0 +1,8 @@
{
"command_prefix": "$",
"token": "",
"sounds_dir": "./sounds",
"azure_speech_key": "",
"azure_speech_region": "",
"azure_speech_voice": ""
}

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
discord.py[voice]==2.2.2
PyNaCl==1.5.0
yt-dlp==2023.3.4
azure-cognitiveservices-speech==1.27.0

304
src/commands/audio.py Normal file
View File

@@ -0,0 +1,304 @@
import asyncio
import azure.cognitiveservices.speech as speechsdk
import discord
import os
import random
import time
import yt_dlp
from typing import Optional
from commands.command import Command
# Taken from https://github.com/Rapptz/discord.py/blob/master/examples/basic_voice.py
# Suppress noise about console usage from errors
yt_dlp.utils.bug_reports_message = lambda: ''
ytdl_format_options = {
'format': 'bestaudio/best',
'outtmpl': './tmp/%(extractor)s-%(id)s-%(title)s.%(ext)s',
'restrictfilenames': True,
'noplaylist': True,
'nocheckcertificate': True,
'ignoreerrors': False,
'logtostderr': False,
'quiet': True,
'no_warnings': True,
'default_search': 'auto',
'source_address': '0.0.0.0', # bind to ipv4 since ipv6 addresses cause issues sometimes
}
ffmpeg_options = {
'options': '-vn',
}
ytdl = yt_dlp.YoutubeDL(ytdl_format_options)
def get_channel_connection(connections: list, channel_id: int) -> Optional[dict]:
for connection in connections:
if connection['id'] == channel_id:
return connection
return None
def remove_channel_connection(connections: list, channel_id: int) -> None:
connection = get_channel_connection(connections, channel_id)
if connection is not None:
connections.remove(connection)
class YTDLSource(discord.PCMVolumeTransformer):
def __init__(self, source, *, data, volume=0.5):
super().__init__(source, volume)
self.data = data
self.title = data.get('title')
self.url = data.get('url')
@classmethod
async def from_url(cls, url, *, loop=None, stream=False):
loop = loop or asyncio.get_event_loop()
data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))
if 'entries' in data:
# take first item from a playlist
data = data['entries'][0]
filename = data['url'] if stream else ytdl.prepare_filename(data)
return cls(discord.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
class Play(Command):
def __init__(self):
super().__init__()
self.name = 'play'
self.display_name = 'Play'
self.description = 'Play a sound via your voice channel, either locally or from a URL. https://ytb-dl.github.io/ytb-dl/supportedsites.html'
self.triggers = ['play', 'p']
self.usage = '{0}play https://www.youtube.com/watch?v=dQw4w9WgXcQ'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
async def play(sound: str, connection: dict):
# Check TTS
if sound.startswith('tts:'):
filename = sound.replace('tts:', '', 1)
if os.path.exists(filename):
player = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(filename, **ffmpeg_options))
connection['client'].play(player, after=player_finished)
else:
# Check local first
for file in os.listdir(config['sounds_dir']):
if file == sound:
full_path = f'{config["sounds_dir"]}/{file}'
if os.path.isfile(file):
player = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(full_path, **ffmpeg_options))
connection['client'].play(player, after=player_finished)
else:
dir_files = os.listdir(full_path)
random_file = dir_files[random.randint(0, len(dir_files) - 1)]
full_dir_path = f'{full_path}/{random_file}'
player = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(full_dir_path, **ffmpeg_options))
connection['client'].play(player, after=player_finished)
if not connection['client'].is_playing():
# Assume if we're not playing then it couldn't find one
player = await YTDLSource.from_url(sound, stream=False)
connection['client'].play(player, after=player_finished)
def player_finished(e: Exception):
voice_channel = message.author.voice.channel
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
if e:
print(f'Player error: {e}')
elif len(connection['queue']) > 0:
# Play next
next = connection['queue'][0]
connection['queue'].remove(next)
asyncio.run_coroutine_threadsafe(play(next, connection), bot.loop)
else:
# Disconnect
asyncio.run_coroutine_threadsafe(connection['client'].disconnect(), bot.loop)
remove_channel_connection(config['voice']['connections'], voice_channel.id)
pass
async with message.channel.typing():
if message.author.voice:
voice_channel = message.author.voice.channel
# Connect to voice
if config.get('voice') is None:
config['voice'] = {
'connections': []
}
# Are we connected?
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
if not connection:
client = await voice_channel.connect()
connection = {'id': voice_channel.id, 'queue': [], 'client': client}
config['voice']['connections'].append(connection)
# Are we playing?
if connection['client'].is_playing():
connection['queue'].append(content)
else:
await play(content, connection)
await message.add_reaction('')
else:
await message.reply('You are not connected to a voice channel!')
class Stop(Command):
def __init__(self):
super().__init__()
self.name = 'stop'
self.display_name = 'Stop'
self.description = 'Stop what\'s currently playing in your voice channel'
self.triggers = ['stop', 's']
self.usage = '{0}stop'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
if message.author.voice:
voice_channel = message.author.voice.channel
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
if connection is not None:
await connection['client'].disconnect()
remove_channel_connection(config['voice']['connections'], voice_channel.id)
class List(Command):
def __init__(self):
super().__init__()
self.name = 'list'
self.display_name = 'List'
self.description = 'List available saved sounds'
self.triggers = ['list', 'l']
self.usage = '{0}list'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
sounds = os.listdir(config['sounds_dir'])
for i in range(0, len(sounds), 100):
output = '\n'.join(sounds[i:i+100])
output = f'```{output}```'
await message.reply(output)
class Add(Command):
def __init__(self):
super().__init__()
self.name = 'add'
self.display_name = 'Add'
self.description = 'Save a sound to the bot'
self.triggers = ['add']
self.hidden = True
self.usage = '{0}add sound_name https://www.youtube.com/watch?v=dQw4w9WgXcQ'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
global ytdl_format_options
sound_details = content.split(' ', 1)
sound_name = sound_details[0]
sound_url = sound_details[1]
ytdl_args = ytdl_format_options.copy()
ytdl_args['outtmpl'] = f'./{config["sounds_dir"]}/{sound_name}/%(extractor)s-%(id)s-%(title)s.%(ext)s'
add_ytdl = yt_dlp.YoutubeDL(ytdl_args)
add_ytdl.download([sound_url])
await message.add_reaction('')
class Queue(Command):
def __init__(self):
super().__init__()
self.name = 'queue'
self.display_name = 'Queue'
self.description = 'List the play queue for your voice channel'
self.triggers = ['queue', 'q']
self.usage = '{0}queue'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
if config.get('voice') is not None and message.author.voice:
voice_channel = message.author.voice.channel
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
if connection:
queue: list = connection['queue']
output = ''
if len(queue) > 0:
output = f'Queue for {voice_channel.name}\n'
for queued in queue:
output += f' - {queued}\n'
else:
output = 'https://media.tenor.com/F5MOelsQd2IAAAAC/patrick-star.gif'
await message.reply(output)
class Skip(Command):
def __init__(self):
super().__init__()
self.name = 'skip'
self.display_name = 'Skip'
self.description = 'Skip to the next sound in the queue'
self.triggers = ['skip']
self.usage = '{0}skip'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
if config.get('voice') is not None and message.author.voice:
voice_channel = message.author.voice.channel
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
if connection:
connection['client'].stop()
await message.add_reaction('')
class Say(Command):
def __init__(self):
super().__init__()
self.name = 'say'
self.display_name = 'Say'
self.description = 'Speaks'
self.triggers = ['say']
self.usage = '{0}say something'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
async with message.channel.typing():
speech_config = speechsdk.SpeechConfig(
subscription=config['azure_speech_key'],
region=config['azure_speech_region']
)
filename = f'./tts/{time.time()}'
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=False, filename=filename)
speech_config.speech_synthesis_voice_name = config['azure_speech_voice']
speech_synthesizer = speechsdk.SpeechSynthesizer(
speech_config=speech_config,
audio_config=audio_config
)
result = speech_synthesizer.speak_text(content)
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
await message.add_reaction('')
# Workaround for annoying classes
play = Play()
await play.run(bot, config, message, f'tts:{filename}')
else:
await message.add_reaction('')

20
src/commands/command.py Normal file
View File

@@ -0,0 +1,20 @@
import discord
class Command():
def __init__(self):
# Useful boilerplate
# You should overwrite these values!
self.name = 'command' # No spaces please
self.display_name = 'Command'
self.description = 'command description'
self.triggers = ['command']
self.hidden = False
# {} will be .formatted to be the command prefix
self.usage = '{}command'
async def run(self, bot: discord.Client, config: dict, message: discord.message, content: str):
# Implement this!
pass

38
src/commands/help.py Normal file
View File

@@ -0,0 +1,38 @@
import discord
from commands.command import Command
class Help(Command):
def __init__(self):
super().__init__()
self.name = 'help'
self.display_name = 'Help'
self.description = 'Show a summary of the available bot commands'
self.triggers = ['help', 'h']
self.usage = '{0}help'
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
output = 'MemeMan 3 : Electric Boogaloo 2\n\t The finest meme connoisseur\n'
output = f'{output}\nCommands:'
if config.get('loaded_commands') == None:
output = f'{output}\nNo commands!'
else:
for command in config.get('loaded_commands').values():
if not command.hidden:
command_output = f'\t{command.display_name} : {command.description}\n'
usage = command.usage.format(config.get('command_prefix'))
command_prefix = config['command_prefix']
triggers_str = f"${f', {command_prefix}'.join(command.triggers)}"
command_output = f'{command_output}\t\tTriggers: {triggers_str}\n'
command_output = f'{command_output}\t\tUsage: {usage}\n'
output = f'{output}\n{command_output}'
output = f'```{output}```'
await message.reply(output)

19
src/core/bot.py Normal file
View File

@@ -0,0 +1,19 @@
import discord
from core.commands import load_commands
from core.messages import handle_message
class Bot(discord.Client):
def load_config(self: discord.Client, config: dict):
self.config = config
load_commands(self.config)
async def on_ready(self: discord.Client):
print(f'Logged in as {self.user}')
thicc_general = self.get_channel(447742387679264768)
if thicc_general is not None:
await thicc_general.send('https://media.tenor.co/images/0b29089d9a0504913b30a463a78fc66a/tenor.gif')
async def on_message(self: discord.Client, message: discord.Message):
await handle_message(self, self.config, message)

77
src/core/commands.py Normal file
View File

@@ -0,0 +1,77 @@
import discord
# Import commands
from commands.help import Help
from commands.audio import Play, Stop, Queue, List, Add, Say, Skip
def split_message(message: str, config: dict) -> list:
prefix = config.get('command_prefix')
if prefix == None:
return []
if message[0] != prefix:
return []
return message.split(prefix, 1)
def load_commands(config: dict):
print('Loading commands')
commands = [
Help,
Play,
Stop,
Queue,
List,
Add,
Say,
Skip
]
loaded_commands = {}
for command in commands:
loaded = command()
if loaded_commands.get(loaded.name) == None:
loaded_commands[loaded.name] = loaded
print(f'\tLoaded command "{loaded.name}"')
else:
print(f'\tA command with name "{loaded.name}" already exists - skipping')
continue
config['loaded_commands'] = loaded_commands
async def dispatch_command(bot: discord.Client, config: dict, message: discord.Message):
split_content = split_message(message.content, config)
if len(split_content) < 2:
print(split_content)
# no prefix
return
split_command = split_content[1].split(' ', 1)
command_name = split_command[0].lower()
message_content = ''
if len(split_command) > 1:
message_content = split_command[1]
print(f'Dispatching command "{command_name}"')
if config.get('loaded_commands') == None:
print('No loaded commands!')
return
found = False
for command in config.get('loaded_commands').values():
if command_name in command.triggers:
print(f'Matched trigger in {command.name}')
found = True
await command.run(bot, config, message, message_content)
break
if not found:
print(f'Could not find command with trigger {command_name}')
await message.reply(f'`{command_name}` is not a command!')

18
src/core/messages.py Normal file
View File

@@ -0,0 +1,18 @@
import discord
from core.commands import dispatch_command
async def handle_message(bot: discord.Client, config: dict, message: discord.Message):
"""
Handle messages received from Discord
"""
if message.author == bot.user:
# Ignore messages from the bot!
return
# Ping!
if message.content == 'ping':
await message.reply('pong!')
if message.content[0] == config.get('command_prefix'):
await dispatch_command(bot, config, message)

38
src/main.py Normal file
View File

@@ -0,0 +1,38 @@
import discord
import json
from core.bot import Bot
def main():
try:
with open('config.json', 'rt') as config_file:
config = json.load(config_file)
except OSError as e:
print(f'Could not open config.json! Error: {e.strerror}')
return
except Exception as e:
print(f'Error loading config. Error: {e}')
return
# Load token from token file if not in JSON config
if config['token'] == '':
try:
with open('token.txt', 'rt') as token_file:
config['token'] = token_file.readline()
except OSError as e:
print(f'Could not open token.txt! Error: {e.strerror}')
return
try:
intents = discord.Intents.default()
intents.message_content = True
bot = Bot(intents=intents)
bot.load_config(config)
bot.run(config['token'])
except Exception as e:
print(f'Exception running bot! Error: {e}')
return
if __name__ == '__main__':
main()