哈喽
作为新手,开始接触 Ruby 这门编程语言
看起来感觉跟 Py 差不多舒适
都是比较简洁的语法
比那隔壁的 Java 看起来感觉好多了
目前不算怎么会写,暂时 get 不到所谓像诗一样的 Ruby 到底有多优雅,但至少代码第一观感还行
这次练手的项目是写一个脚本来自动下载小宇宙的播客节目(蛮喜欢小宇宙这个播客平台)
开发环境:macOS 自带的 Ruby 2.6 版本 附上代码 仅供学习参考!
#!/usr/bin/env ruby
# 小宇宙播客下载脚本(支持备用地址重试)
# 仅使用 Ruby 系统内置库,无需额外安装
# 1. 引入 Ruby 标准库
require 'net/http'
require 'uri'
require 'timeout'
require 'json'
require 'fileutils'
# 2. 配置参数
TIMEOUT_SEC = 300 # 下载超时时间(秒)
NEXT_DATA_REGEX = /<script id="__NEXT_DATA__" type="application\/json">([\s\S]*?)<\/script>/
MEDIA_PATH = %w[props pageProps episode media] # media字段路径
SOURCE_URL_PATH = %w[source url] # 主下载地址路径(media -> source -> url)
BACKUP_SOURCE_URL_PATH = %w[backupSource url] # 备用下载地址路径(media -> backupSource -> url)
TITLE_PATH = %w[props pageProps episode title] # 节目名称路径
CHUNK_SIZE = 4096 # 下载分块大小(字节)
INVALID_CHARS = /[\/\\:*?"<>|]/ # 文件名非法字符
# 3. 获取用户桌面路径(适配多系统)
def desktop_path
case RbConfig::CONFIG['host_os']
when /mswin|mingw|cygwin/ # Windows
File.join(ENV['USERPROFILE'], 'Desktop')
when /darwin/ # macOS
File.join(ENV['HOME'], 'Desktop')
else # Linux 等类 Unix 系统
File.join(ENV['HOME'], 'Desktop')
end
rescue
raise "无法获取桌面路径,请检查系统环境"
end
# 从下载链接提取文件扩展名
def extract_file_extension(url)
uri = URI.parse(url)
ext = File.extname(uri.path)
ext.empty? ? '.m4a' : ext
rescue
'.m4a'
end
# 清理文件名中的非法字符
def clean_filename(raw_name)
cleaned = raw_name.gsub(INVALID_CHARS, '_')
cleaned.strip.empty? ? "未命名节目" : cleaned.strip
end
# 处理文件重名
def resolve_save_path(desktop, base_name, ext)
base_filename = "#{base_name}#{ext}"
base_path = File.join(desktop, base_filename)
return base_path unless File.exist?(base_path)
counter = 1
loop do
new_filename = "#{base_name}_#{counter}#{ext}"
new_path = File.join(desktop, new_filename)
return new_path unless File.exist?(new_path)
counter += 1
end
end
# 下载文件(单独提取为方法,支持传入不同URL重试)
def download_with_url(url, save_path)
uri = URI.parse(url)
puts "\n📥 开始下载(地址:#{url.split('/')[-1]}):#{File.basename(save_path)}"
puts "📌 保存路径:#{save_path}"
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = uri.scheme == 'https'
http.open_timeout = TIMEOUT_SEC
http.read_timeout = TIMEOUT_SEC
total_size = nil
begin
head_response = http.request(Net::HTTP::Head.new(uri))
total_size = head_response['Content-Length']&.to_i
rescue
puts "⚠️ 无法获取文件大小,将继续下载..."
end
downloaded_size = 0
File.open(save_path, 'wb') do |file|
http.request(Net::HTTP::Get.new(uri)) do |response|
unless response.is_a?(Net::HTTPSuccess)
raise "下载请求失败,状态码:#{response.code}"
end
response.read_body do |chunk|
file.write(chunk)
downloaded_size += chunk.size
if total_size
progress = (downloaded_size.to_f / total_size * 100).round(1)
print "\r⏳ 下载进度:#{progress}% (#{format_size(downloaded_size)}/#{format_size(total_size)})"
else
print "\r⏳ 已下载:#{format_size(downloaded_size)}"
end
end
end
end
puts "\n✅ 下载完成!文件已保存到桌面"
rescue Timeout::Error
FileUtils.rm_f(save_path)
raise "下载超时(超过 #{TIMEOUT_SEC} 秒)"
rescue => e
FileUtils.rm_f(save_path)
raise "下载失败:#{e.message}"
end
# 格式化文件大小
def format_size(bytes)
return "#{bytes} B" if bytes < 1024
return "%.2f KB" % (bytes / 1024.0) if bytes < 1024**2
"%.2f MB" % (bytes / (1024**2).to_f)
end
# 链接校验
def get_target_url
loop do
print "\n请输入小宇宙播客详情页链接(例如:https://www.xiaoyuzhoufm.com/episode/xxx):"
url = gets.chomp
if url.empty?
puts "⚠️ 链接不能为空,请重新输入!"
next
end
begin
uri = URI.parse(url)
unless uri.scheme && %w[http https].include?(uri.scheme) && uri.host&.include?('xiaoyuzhoufm.com')
raise "格式错误"
end
rescue
puts "⚠️ 链接格式无效(需包含 http/https 且为小宇宙域名),请重新输入!"
next
end
return url
end
end
# 请求页面HTML
def fetch_page_html(url)
uri = URI.parse(url)
puts "\n🔍 正在请求目标页面:#{url}"
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = uri.scheme == 'https'
http.open_timeout = 30
http.read_timeout = 30
request = Net::HTTP::Get.new(uri)
request['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
request['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
request['Referer'] = 'https://www.xiaoyuzhoufm.com/'
response = http.request(request)
unless response.code.to_i == 200
raise "页面请求失败,状态码:#{response.code}(可能链接无效或被拦截)"
end
response.body
rescue Timeout::Error
raise "网络请求超时(超过 30 秒),请检查网络连接"
rescue => e
raise "网络请求异常:#{e.message}"
end
# 提取__NEXT_DATA__
def extract_next_data(html)
match_result = html.match(NEXT_DATA_REGEX)
unless match_result && match_result[1]
raise "未在页面中找到 __NEXT_DATA__ 字段(可能页面结构已变更)"
end
match_result[1].strip
end
# 提取节目名称
def extract_episode_title(next_data_json)
begin
next_data = JSON.parse(next_data_json)
rescue JSON::ParserError => e
raise "JSON 解析失败:#{e.message}(无法提取节目名称)"
end
current = next_data
TITLE_PATH.each_with_index do |key, index|
unless current.is_a?(Hash) && current.key?(key)
missing_path = TITLE_PATH[0..index].join(' -> ')
raise "未找到节目名称,缺失层级:#{missing_path}"
end
current = current[key]
end
unless current.is_a?(String) && !current.strip.empty?
raise "提取的节目名称无效:#{current.inspect}"
end
current.strip
end
# 提取media字段
def extract_media_field(next_data_json)
begin
next_data = JSON.parse(next_data_json)
rescue JSON::ParserError => e
raise "JSON 解析失败:#{e.message}"
end
current = next_data
MEDIA_PATH.each_with_index do |key, index|
unless current.is_a?(Hash) && current.key?(key)
missing_path = MEDIA_PATH[0..index].join(' -> ')
raise "未找到media字段,缺失层级:#{missing_path}"
end
current = current[key]
end
current
end
# 提取主下载地址
def extract_download_url(media_data)
unless media_data.is_a?(Hash)
raise "media字段格式异常(不是哈希类型)"
end
current = media_data
SOURCE_URL_PATH.each_with_index do |key, index|
unless current.is_a?(Hash) && current.key?(key)
full_missing_path = ["media"] + SOURCE_URL_PATH[0..index]
raise "未找到主下载地址,缺失层级:#{full_missing_path.join(' -> ')}"
end
current = current[key]
end
validate_url(current)
current
end
# 新增:提取备用下载地址(不抛出异常,不存在则返回nil)
def extract_backup_download_url(media_data)
return nil unless media_data.is_a?(Hash)
current = media_data
BACKUP_SOURCE_URL_PATH.each_with_index do |key, index|
# 备用地址缺失时不报错,直接返回nil
unless current.is_a?(Hash) && current.key?(key)
puts "ℹ️ 未找到备用下载地址(缺失层级:media -> #{BACKUP_SOURCE_URL_PATH[0..index].join(' -> ')})"
return nil
end
current = current[key]
end
begin
validate_url(current)
current
rescue
puts "ℹ️ 备用下载地址格式无效:#{current}"
nil
end
end
# 校验URL格式
def validate_url(url)
return if url.is_a?(String) && !url.strip.empty?
raise "URL格式无效(非字符串或为空):#{url.inspect}"
uri = URI.parse(url)
unless uri.scheme && %w[http https].include?(uri.scheme)
raise "URL协议错误(需http/https):#{url}"
end
rescue
raise "URL解析失败:#{url}"
end
# 主执行流程
def main
puts "======================================"
puts "小宇宙播客下载脚本(支持备用地址重试)"
puts "功能:主地址失败自动尝试备用地址 → 提取节目名称 → 下载到桌面"
puts "主地址路径:media -> source -> url"
puts "备用地址路径:media -> backupSource -> url"
puts "======================================"
begin
target_url = get_target_url
page_html = fetch_page_html(target_url)
next_data_json = extract_next_data(page_html)
episode_title = extract_episode_title(next_data_json)
puts "\n📌 提取到节目名称:#{episode_title}"
media_data = extract_media_field(next_data_json)
main_url = extract_download_url(media_data)
puts "📥 提取到主下载地址:#{main_url.split('/')[-1]}"
# 提取备用地址(不中断流程)
backup_url = extract_backup_download_url(media_data)
puts "📥 提取到备用下载地址:#{backup_url.split('/')[-1]}" if backup_url
# 处理文件名和路径
cleaned_title = clean_filename(episode_title)
file_ext = extract_file_extension(main_url) # 用主地址的扩展名(备用地址通常格式一致)
desktop = desktop_path
save_path = resolve_save_path(desktop, cleaned_title, file_ext)
# 先尝试主地址下载
begin
download_with_url(main_url, save_path)
rescue => e
puts "⚠️ 主地址下载失败:#{e.message}"
# 主地址失败且有备用地址时重试
if backup_url
puts "🔄 尝试使用备用地址下载..."
download_with_url(backup_url, save_path)
else
raise "主地址下载失败,且无可用备用地址"
end
end
rescue => e
puts "\n❌ 执行失败:#{e.message}"
exit 1
end
puts "\n🎉 操作完成!"
end
# 启动脚本
main