Audio, TTS
This commit is contained in:
302
src/commands/audio.py
Normal file
302
src/commands/audio.py
Normal file
@@ -0,0 +1,302 @@
|
||||
import asyncio
|
||||
import azure.cognitiveservices.speech as speechsdk
|
||||
import discord
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import yt_dlp
|
||||
|
||||
from commands.command import Command
|
||||
|
||||
# Taken from https://github.com/Rapptz/discord.py/blob/master/examples/basic_voice.py
|
||||
# Suppress noise about console usage from errors
|
||||
yt_dlp.utils.bug_reports_message = lambda: ''
|
||||
|
||||
ytdl_format_options = {
|
||||
'format': 'bestaudio/best',
|
||||
'outtmpl': './tmp/%(extractor)s-%(id)s-%(title)s.%(ext)s',
|
||||
'restrictfilenames': True,
|
||||
'noplaylist': True,
|
||||
'nocheckcertificate': True,
|
||||
'ignoreerrors': False,
|
||||
'logtostderr': False,
|
||||
'quiet': True,
|
||||
'no_warnings': True,
|
||||
'default_search': 'auto',
|
||||
'source_address': '0.0.0.0', # bind to ipv4 since ipv6 addresses cause issues sometimes
|
||||
}
|
||||
|
||||
ffmpeg_options = {
|
||||
'options': '-vn',
|
||||
}
|
||||
|
||||
ytdl = yt_dlp.YoutubeDL(ytdl_format_options)
|
||||
|
||||
def get_channel_connection(connections: list, channel_id: int) -> dict | None:
|
||||
for connection in connections:
|
||||
if connection['id'] == channel_id:
|
||||
return connection
|
||||
|
||||
return None
|
||||
|
||||
def remove_channel_connection(connections: list, channel_id: int) -> None:
|
||||
connection = get_channel_connection(connections, channel_id)
|
||||
if connection is not None:
|
||||
connections.remove(connection)
|
||||
|
||||
|
||||
class YTDLSource(discord.PCMVolumeTransformer):
|
||||
def __init__(self, source, *, data, volume=0.5):
|
||||
super().__init__(source, volume)
|
||||
|
||||
self.data = data
|
||||
|
||||
self.title = data.get('title')
|
||||
self.url = data.get('url')
|
||||
|
||||
@classmethod
|
||||
async def from_url(cls, url, *, loop=None, stream=False):
|
||||
loop = loop or asyncio.get_event_loop()
|
||||
data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream))
|
||||
|
||||
if 'entries' in data:
|
||||
# take first item from a playlist
|
||||
data = data['entries'][0]
|
||||
|
||||
filename = data['url'] if stream else ytdl.prepare_filename(data)
|
||||
return cls(discord.FFmpegPCMAudio(filename, **ffmpeg_options), data=data)
|
||||
|
||||
class Play(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'play'
|
||||
self.display_name = 'Play'
|
||||
self.description = 'Play a sound via your voice channel, either locally or from a URL. https://ytb-dl.github.io/ytb-dl/supportedsites.html'
|
||||
self.triggers = ['play', 'p']
|
||||
|
||||
self.usage = '{0}play https://www.youtube.com/watch?v=dQw4w9WgXcQ'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
async def play(sound: str, connection: dict):
|
||||
# Check TTS
|
||||
if sound.startswith('tts:'):
|
||||
filename = sound.replace('tts:', '', 1)
|
||||
if os.path.exists(filename):
|
||||
player = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(filename, **ffmpeg_options))
|
||||
connection['client'].play(player, after=player_finished)
|
||||
else:
|
||||
# Check local first
|
||||
for file in os.listdir(config['sounds_dir']):
|
||||
if file == sound:
|
||||
full_path = f'{config["sounds_dir"]}/{file}'
|
||||
if os.path.isfile(file):
|
||||
player = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(full_path, **ffmpeg_options))
|
||||
connection['client'].play(player, after=player_finished)
|
||||
else:
|
||||
dir_files = os.listdir(full_path)
|
||||
random_file = dir_files[random.randint(0, len(dir_files) - 1)]
|
||||
full_dir_path = f'{full_path}/{random_file}'
|
||||
|
||||
player = discord.PCMVolumeTransformer(discord.FFmpegPCMAudio(full_dir_path, **ffmpeg_options))
|
||||
connection['client'].play(player, after=player_finished)
|
||||
|
||||
if not connection['client'].is_playing():
|
||||
# Assume if we're not playing then it couldn't find one
|
||||
player = await YTDLSource.from_url(sound, stream=False)
|
||||
connection['client'].play(player, after=player_finished)
|
||||
|
||||
def player_finished(e: Exception):
|
||||
voice_channel = message.author.voice.channel
|
||||
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
|
||||
if e:
|
||||
print(f'Player error: {e}')
|
||||
elif len(connection['queue']) > 0:
|
||||
# Play next
|
||||
next = connection['queue'][0]
|
||||
connection['queue'].remove(next)
|
||||
asyncio.run_coroutine_threadsafe(play(next, connection), bot.loop)
|
||||
else:
|
||||
# Disconnect
|
||||
asyncio.run_coroutine_threadsafe(connection['client'].disconnect(), bot.loop)
|
||||
remove_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
pass
|
||||
|
||||
async with message.channel.typing():
|
||||
if message.author.voice:
|
||||
voice_channel = message.author.voice.channel
|
||||
# Connect to voice
|
||||
if config.get('voice') is None:
|
||||
config['voice'] = {
|
||||
'connections': []
|
||||
}
|
||||
|
||||
# Are we connected?
|
||||
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
|
||||
if not connection:
|
||||
client = await voice_channel.connect()
|
||||
connection = {'id': voice_channel.id, 'queue': [], 'client': client}
|
||||
config['voice']['connections'].append(connection)
|
||||
|
||||
# Are we playing?
|
||||
if connection['client'].is_playing():
|
||||
connection['queue'].append(content)
|
||||
else:
|
||||
await play(content, connection)
|
||||
|
||||
await message.add_reaction('✅')
|
||||
|
||||
else:
|
||||
await message.reply('You are not connected to a voice channel!')
|
||||
|
||||
class Stop(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'stop'
|
||||
self.display_name = 'Stop'
|
||||
self.description = 'Stop what\'s currently playing in your voice channel'
|
||||
self.triggers = ['stop', 's']
|
||||
|
||||
self.usage = '{0}stop'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
if message.author.voice:
|
||||
voice_channel = message.author.voice.channel
|
||||
|
||||
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
if connection is not None:
|
||||
await connection['client'].disconnect()
|
||||
remove_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
|
||||
class List(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'list'
|
||||
self.display_name = 'List'
|
||||
self.description = 'List available saved sounds'
|
||||
self.triggers = ['list', 'l']
|
||||
|
||||
self.usage = '{0}list'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
sounds = os.listdir(config['sounds_dir'])
|
||||
|
||||
for i in range(0, len(sounds), 100):
|
||||
output = '\n'.join(sounds[i:i+100])
|
||||
output = f'```{output}```'
|
||||
await message.reply(output)
|
||||
|
||||
class Add(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'add'
|
||||
self.display_name = 'Add'
|
||||
self.description = 'Save a sound to the bot'
|
||||
self.triggers = ['add']
|
||||
|
||||
self.hidden = True
|
||||
|
||||
self.usage = '{0}add sound_name https://www.youtube.com/watch?v=dQw4w9WgXcQ'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
global ytdl_format_options
|
||||
|
||||
sound_details = content.split(' ', 1)
|
||||
sound_name = sound_details[0]
|
||||
sound_url = sound_details[1]
|
||||
|
||||
ytdl_args = ytdl_format_options.copy()
|
||||
ytdl_args['outtmpl'] = f'./{config["sounds_dir"]}/{sound_name}/%(extractor)s-%(id)s-%(title)s.%(ext)s'
|
||||
add_ytdl = yt_dlp.YoutubeDL(ytdl_args)
|
||||
|
||||
add_ytdl.download([sound_url])
|
||||
await message.add_reaction('✅')
|
||||
|
||||
class Queue(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'queue'
|
||||
self.display_name = 'Queue'
|
||||
self.description = 'List the play queue for your voice channel'
|
||||
self.triggers = ['queue', 'q']
|
||||
|
||||
self.usage = '{0}queue'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
if config.get('voice') is not None and message.author.voice:
|
||||
voice_channel = message.author.voice.channel
|
||||
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
if connection:
|
||||
queue: list = connection['queue']
|
||||
output = ''
|
||||
if len(queue) > 0:
|
||||
output = f'Queue for {voice_channel.name}\n'
|
||||
for queued in queue:
|
||||
output += f' - {queued}\n'
|
||||
else:
|
||||
output = 'https://media.tenor.com/F5MOelsQd2IAAAAC/patrick-star.gif'
|
||||
|
||||
await message.reply(output)
|
||||
|
||||
class Skip(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'skip'
|
||||
self.display_name = 'Skip'
|
||||
self.description = 'Skip to the next sound in the queue'
|
||||
self.triggers = ['skip']
|
||||
|
||||
self.usage = '{0}skip'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
if config.get('voice') is not None and message.author.voice:
|
||||
voice_channel = message.author.voice.channel
|
||||
connection = get_channel_connection(config['voice']['connections'], voice_channel.id)
|
||||
if connection:
|
||||
connection['client'].stop()
|
||||
|
||||
await message.add_reaction('✅')
|
||||
|
||||
|
||||
class Say(Command):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.name = 'say'
|
||||
self.display_name = 'Say'
|
||||
self.description = 'Speaks'
|
||||
self.triggers = ['say']
|
||||
|
||||
self.usage = '{0}say something'
|
||||
|
||||
async def run(self, bot: discord.Client, config: dict, message: discord.Message, content: str):
|
||||
async with message.channel.typing():
|
||||
speech_config = speechsdk.SpeechConfig(
|
||||
subscription=config['azure_speech_key'],
|
||||
region=config['azure_speech_region']
|
||||
)
|
||||
filename = f'./tts/{time.time()}'
|
||||
audio_config = speechsdk.audio.AudioOutputConfig(use_default_speaker=False, filename=filename)
|
||||
speech_config.speech_synthesis_voice_name = config['azure_speech_voice']
|
||||
|
||||
speech_synthesizer = speechsdk.SpeechSynthesizer(
|
||||
speech_config=speech_config,
|
||||
audio_config=audio_config
|
||||
)
|
||||
|
||||
result = speech_synthesizer.speak_text(content)
|
||||
if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
|
||||
await message.add_reaction('✅')
|
||||
|
||||
# Workaround for annoying classes
|
||||
play = Play()
|
||||
await play.run(bot, config, message, f'tts:{filename}')
|
||||
else:
|
||||
await message.add_reaction('❌')
|
||||
Reference in New Issue
Block a user