实现简易的播客下载客户端

  |  

摘要: 以多线程下载为核心的小项目

【对算法,数学,计算机感兴趣的同学,欢迎关注我哈,阅读更多原创文章】
我的网站:潮汐朝夕的生活实验室
我的公众号:算法题刷刷
我的知乎:潮汐朝夕
我的github:FennelDumplings
我的leetcode:FennelDumplings


背景介绍

在 Python 的 queue.Queue 是一个线程安全的先进先出数据结构。本文我们用 queue.Queue 加多线程技术,实现一个简易的播客下载的客户端。功能如下:

程序首先读取 RSS feeds,将每个 feeds 中最近的 5 条 feed 的附件打包后压进队列中,然后一一弹出进行下载,将多个下载任务用线程进行并行化。注意这里我们没有进行错误处理,在产品的代码中需要加上这一块。

实现多线程下载器

首先我们导入需要的标准库和第三方库,然后定义全局变量,例如队列,以及需要硬编码的信息,例如 feed 的 url,线程数等。

这里我们需要用到一个第三方库 feedparser,这是一个解析 RSS 的库,直接 pip 安装即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data.
feed_urls = [
"http://talkpython.fm/episodes/rss",
]

def message(s):
print('{}: {}'.format(threading.current_thread().name, s))

接下来定义下载的函数,该函数是 worker 线程执行的函数,使用 urllib 执行下载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def download_enclosures(q):
""" This is the worker thread function.
It processes items in the queue one after
another. These daemon threads go into an
infinite loop, and exit only when
the main thread ends.
"""
while True:
message('looking for the next enclosure')
url = q.get()
filename = url.rpartition('/')[-1]
message('downloading {}'.format(filename))
response = urllib.request.urlopen(url)
data = response.read()
# Save the downloaded file to the current directory.
message('writing to {}'.format(filename))
with open(filename, 'wb') as outfile:
outfile.write(data)
q.task_done()

接下来在主函数中写业务流程,其中包含 feed 的解析逻辑:

step1: 启动下载线程

step2: 用 feedparser 模块检索 feed 内容,将附件的 url 压入队列。

当第一个附件的 url 压入队列,其中一个 worker 线程就会将其弹出并开始下载。队列会不断压入新的附件 url,直至 feed 耗尽,同时各个 worker 线程会持续从队列弹出 url 并下载。

step3: 压队的代码写完后,代码层面要做的事情就只有等待队列再次变空,调用队列实例的 join 方法即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def main():
# Set up some threads to fetch the enclosures.
for i in range(num_fetch_threads):
worker = threading.Thread(target=download_enclosures
,args=(enclosure_queue,)
,name='worker-{}'.format(i)
)
worker.setDaemon(True)
worker.start()

# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][:5]:
for enclosure in entry.get('enclosures', []):
parsed_url = urlparse(enclosure['url'])
message('queuing {}'.format(parsed_url.path.rpartition('/')[-1]))
enclosure_queue.put(enclosure['url'])

# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')

完整代码可以在以下 github 仓库中获取

1
https://github.com/FennelDumplings/python_toys

测试

运行代码后,打印的信息如下

运行完成后,可以看到下载的几个 mp3 文件


Share