跳转到主要内容

category

在这篇文章中,我演示了如何使用Python中的OpenAI Whisper近乎实时地转录实时音频流。我们这样做是为了监视流中的特定关键字。此外,转录后的文本会记录时间戳以供进一步使用。通过对转录的文本进行模糊匹配,我们可以找到对关键词的提及。然后,我们通过信号信使向包含口语段落相关部分的团体或个人触发一条信息。

背景

这是一个在周末建立的快速POC:为了赢得比赛,我想监控当地电台是否提到了一些关键词。这需要迅速完成,这产生了一个简单的解决方案。此外,它必须尽可能节约资源,以最大限度地降低基础设施成本。虽然它并不是以稳定性为主要关注点来构建的,但它实际上在几周内表现完美,没有任何停机时间。因此,目标实现了!

所有代码在此回购中都可用。在下文中,我将介绍解决方案的总体结构,并解释代码的一些相关部分。

概述

该解决方案由三部分组成:

save_stream.py从实时音频流中以30秒为单位连续保存.mp3文件

transcript.py使用OpenAI Whisper永久转录每个音频块。然后,它使用模糊匹配来监控口语中的关键词。在比赛中,它调用msg_group_via_signal.sh

msg_group_via_signal.sh将报警消息中继到信号cli工具,该工具在信号信使上向一个组发送消息

我们使用OpenAI的Whisper,因为它是目前性能最好的音频转录模型之一。此外,它很容易买到,有不同的型号尺寸。使用小模型,即使在非英语音频上,我们也能获得不错的结果。此外,它的资源效率足够高,可以在CPU上运行而不会落后于流。我在AWS上的一台c5a.大型EC2机器上部署它取得了很好的效果,每月花费约65美元。为了说明不完美的转录质量,我们在转录中寻找关键词时使用模糊搜索。因此,我们减少了假阴性(但增加了假阳性)警报。有了更好的规格/GPU,您可以增加模型大小以获得更高质量的转录。

细节

让我们来看看其中的一些代码。我们从save_stream.py开始,它主要由以下函数组成:

def record_stream_to_file(stream: requests.Response):
    """Record stream audio to files as .mp3 in chunks during recording times
    Args:
        stream (requests.Response): Audio stream
    """
    start_utc = datetime.utcnow()
    start_local = datetime.now(tz=LOCAL_TZ)
    current_local_time = start_local.time()
    log.info(
        "Current tz time: %s. Stream from: %s Stream until: %s",
        current_local_time,
        STREAM_TIME_FROM,
        STREAM_TIME_TO,
    )
    if not STREAM_TIME_FROM < current_local_time < STREAM_TIME_TO:
        log.warning("Not during recording time")
        sys.exit(0)
    filename = DATA_PATH + "/stream_" + start_utc.isoformat(timespec="seconds") + ".mp3"
    log.info("Writing stream to: %s", filename)
    with open(filename, "wb") as file:
        try:
            for block in stream.iter_content(1024):
                file.write(block)
                if datetime.utcnow() - start_utc > timedelta(
                    seconds=CHUNK_TIME_SECONDS
                ):
                    file.close()
                    record_stream_to_file(stream)
        except KeyboardInterrupt:
            log.info("Received keyboard interrupt")
            sys.exit(0)

 

它非常直接:一开始,我们检查当前的本地时间是否在预定义的时间段内。这是我们想要监控的时期。在此期间之外,我们只是退出脚本。为了进行部署,我创建了一个简单的bash脚本,用于检查python脚本是否已经在运行,并以其他方式启动它。然后,我使用cron每分钟启动一次这个bash脚本。因此,请确保我们的save_stream.py只在需要时进行记录。很简单!谁还需要气流?

在下一步中,我们对请求进行迭代。按块方式响应对象(打开的流)。我们将每个chuck写入一个文件,直到达到时间限制(对我来说是30秒),然后再次递归调用该函数。

因此,我们将在本地文件夹中存储一个30秒长的.mp3文件流,并准备进行转录。

接下来,我们将重点讨论transcript.py脚本。这是我们解决方案的核心。与第一个脚本类似,它也只在一天中的监控期内运行,否则就会停止。

第一步是从上游脚本获取所有最近的.mp3文件:

def get_recent_files() -> list:
    """Return file paths for recently created files
    Returns:
        list: File paths
    """
    log.info("Listing recent files")
    now = datetime.utcnow()
    audio_files = []
    for file in sorted(Path(PATH_AUDIO_FILES).iterdir()):
        if ".mp3" in file.name:
            file_ts = datetime.fromtimestamp(file.stat().st_ctime)
            if now - file_ts <= timedelta(minutes=RECENT_FILES_TIME_MIN):
                audio_files.append(file)
    log.debug("Recent files: %s", audio_files)
    return audio_files

我们根据时间戳对音频文件夹中的文件进行排序,并检查它们是否在我们的最近性限制内。如果是,则将它们添加到列表中并进行处理。这是一种使我们的代码更加健壮的措施:如果转录脚本失败并在任何时候存在,它将由cron重新启动。我们希望它继续转录,但仅限于流的最近几分钟。否则,它将落后,不再是实时的。

转录功能本身非常简单:

def transcribe_file(model, options, file_path: str) -> str:
    """Transcribe the .mp3 file to text
    Args:
        model: Whisper Model
        file_path (str): File path
    Returns:
        str: Transcribed text
    """
    audio = whisper.load_audio(file_path)
    audio = whisper.pad_or_trim(audio)
    mel = whisper.log_mel_spectrogram(audio).to(model.device)
    result = whisper.decode(model, mel, options)
    return result.text  # type: ignore

Whisper库为我们承担了所有繁重的工作。因此,我们的音频文件的加载、预处理和解码需要四行代码。这是一个欣赏Python的简单性和开源开发人员定期投入的辛勤工作的好时机!如果您想了解创建模型输入所需的信号处理的更多信息,我建议您阅读以下内容。官方博客文章介绍了耳语,这是另一个引人入胜的阅读。

现在,我们已经将语音转换为文本,我们可以监控文本中的关键字:

def search_for_text(text: str):
    """Search for search term in text and send alarm if found"
    Args:
        text (str): Text to search
    """
    log.info("Searching in text")
    text = text.lower()

    for term in SEARCH_TERMS_LIVE:
        results = find_near_matches(term, text, max_l_dist=2)
        if results:
            log.debug("Search results: %s", results)
            log.info("Found live term: %s", term)
            send_alarm_to_signal(text, live=True)

    for term in SEARCH_TERMS_DEV:
        results = find_near_matches(term, text, max_l_dist=1)
        if results:
            log.debug("Search results: %s", results)
            log.info("Found dev term: %s", term)
            send_alarm_to_signal(text, live=False)

我们使用fuzzysearch包将转录的文本与我们的关键词进行比较。这使我们能够匹配我们的关键词,即使它们没有被完美地转录。从而减少了出现假阴性的机会。文中应用了简单快速的Levenstein距离算法。

最后,在关键字匹配时,我们调用send_alarm_to_signal函数:

def send_alarm_to_signal(text: str, live=False):
    """Send alarm via signal bash script
    Args:
        text (str): Text with match
        live (bool, optional): Live or test. Defaults to False.
    """

    message = "This is a test. I've picked up the following: \n"
    if live:
        message = "This is a LIVE. I've picked up the following:\n"
    message = message + text
    subprocess.Popen([PATH_TO_SIGNAL_SCRIPT, message])

这只需调用bash脚本,并将转录文本的一部分与之匹配。同样,bash脚本非常简单:

#!/bin/bash
echo "Sending message:"
echo "$1"
echo ""
echo "$1" | /usr/local/bin/signal-cli send --message-from-stdin -g "MyGroupID"

它调用信号cli工具,通过stdin回显所依赖的消息。当正确设置(按照快速启动)时,您可以使用该工具作为Signal Messenger机器人。在这种情况下,我们使用它向一个组发送消息,提醒它音频流中的关键字匹配!

 

文章链接