Ruby Ruby Girl,小白学习 Ruby 第一弹 尝试下载小宇宙播客节目

AixCoder · 2025年10月24日 · 最后由 daqing 回复于 2025年10月25日 · 159 次阅读

哈喽😀
作为新手,开始接触 Ruby 这门编程语言

看起来感觉跟 Py 差不多舒适

都是比较简洁的语法

比那隔壁的 Java 看起来感觉好多了

目前不算怎么会写,暂时 get 不到所谓像诗一样的 Ruby 到底有多优雅,但至少代码第一观感还行

这次练手的项目是写一个脚本来自动下载小宇宙的播客节目(蛮喜欢小宇宙这个播客平台)

开发环境:macOS 自带的 Ruby 2.6 版本 附上代码 仅供学习参考!

#!/usr/bin/env ruby
# 小宇宙播客下载脚本(支持备用地址重试)
# 仅使用 Ruby 系统内置库,无需额外安装

# 1. 引入 Ruby 标准库
require 'net/http'
require 'uri'
require 'timeout'
require 'json'
require 'fileutils'

# 2. 配置参数
TIMEOUT_SEC = 300  # 下载超时时间(秒)
NEXT_DATA_REGEX = /<script id="__NEXT_DATA__" type="application\/json">([\s\S]*?)<\/script>/
MEDIA_PATH = %w[props pageProps episode media]          # media字段路径
SOURCE_URL_PATH = %w[source url]                       # 主下载地址路径(media -> source -> url)
BACKUP_SOURCE_URL_PATH = %w[backupSource url]           # 备用下载地址路径(media -> backupSource -> url)
TITLE_PATH = %w[props pageProps episode title]          # 节目名称路径
CHUNK_SIZE = 4096  # 下载分块大小(字节)
INVALID_CHARS = /[\/\\:*?"<>|]/  # 文件名非法字符

# 3. 获取用户桌面路径(适配多系统)
def desktop_path
  case RbConfig::CONFIG['host_os']
  when /mswin|mingw|cygwin/  # Windows
    File.join(ENV['USERPROFILE'], 'Desktop')
  when /darwin/  # macOS
    File.join(ENV['HOME'], 'Desktop')
  else  # Linux 等类 Unix 系统
    File.join(ENV['HOME'], 'Desktop')
  end
rescue
  raise "无法获取桌面路径,请检查系统环境"
end

# 从下载链接提取文件扩展名
def extract_file_extension(url)
  uri = URI.parse(url)
  ext = File.extname(uri.path)
  ext.empty? ? '.m4a' : ext
rescue
  '.m4a'
end

# 清理文件名中的非法字符
def clean_filename(raw_name)
  cleaned = raw_name.gsub(INVALID_CHARS, '_')
  cleaned.strip.empty? ? "未命名节目" : cleaned.strip
end

# 处理文件重名
def resolve_save_path(desktop, base_name, ext)
  base_filename = "#{base_name}#{ext}"
  base_path = File.join(desktop, base_filename)
  return base_path unless File.exist?(base_path)

  counter = 1
  loop do
    new_filename = "#{base_name}_#{counter}#{ext}"
    new_path = File.join(desktop, new_filename)
    return new_path unless File.exist?(new_path)
    counter += 1
  end
end

# 下载文件(单独提取为方法,支持传入不同URL重试)
def download_with_url(url, save_path)
  uri = URI.parse(url)
  puts "\n📥 开始下载(地址:#{url.split('/')[-1]}):#{File.basename(save_path)}"
  puts "📌 保存路径:#{save_path}"

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme == 'https'
  http.open_timeout = TIMEOUT_SEC
  http.read_timeout = TIMEOUT_SEC

  total_size = nil
  begin
    head_response = http.request(Net::HTTP::Head.new(uri))
    total_size = head_response['Content-Length']&.to_i
  rescue
    puts "⚠️ 无法获取文件大小,将继续下载..."
  end

  downloaded_size = 0
  File.open(save_path, 'wb') do |file|
    http.request(Net::HTTP::Get.new(uri)) do |response|
      unless response.is_a?(Net::HTTPSuccess)
        raise "下载请求失败,状态码:#{response.code}"
      end

      response.read_body do |chunk|
        file.write(chunk)
        downloaded_size += chunk.size

        if total_size
          progress = (downloaded_size.to_f / total_size * 100).round(1)
          print "\r⏳ 下载进度:#{progress}% (#{format_size(downloaded_size)}/#{format_size(total_size)})"
        else
          print "\r⏳ 已下载:#{format_size(downloaded_size)}"
        end
      end
    end
  end

  puts "\n✅ 下载完成!文件已保存到桌面"
rescue Timeout::Error
  FileUtils.rm_f(save_path)
  raise "下载超时(超过 #{TIMEOUT_SEC} 秒)"
rescue => e
  FileUtils.rm_f(save_path)
  raise "下载失败:#{e.message}"
end

# 格式化文件大小
def format_size(bytes)
  return "#{bytes} B" if bytes < 1024
  return "%.2f KB" % (bytes / 1024.0) if bytes < 1024**2
  "%.2f MB" % (bytes / (1024**2).to_f)
end

# 链接校验
def get_target_url
  loop do
    print "\n请输入小宇宙播客详情页链接(例如:https://www.xiaoyuzhoufm.com/episode/xxx):"
    url = gets.chomp

    if url.empty?
      puts "⚠️ 链接不能为空,请重新输入!"
      next
    end

    begin
      uri = URI.parse(url)
      unless uri.scheme && %w[http https].include?(uri.scheme) && uri.host&.include?('xiaoyuzhoufm.com')
        raise "格式错误"
      end
    rescue
      puts "⚠️ 链接格式无效(需包含 http/https 且为小宇宙域名),请重新输入!"
      next
    end

    return url
  end
end

# 请求页面HTML
def fetch_page_html(url)
  uri = URI.parse(url)
  puts "\n🔍 正在请求目标页面:#{url}"

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme == 'https'
  http.open_timeout = 30
  http.read_timeout = 30

  request = Net::HTTP::Get.new(uri)
  request['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
  request['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
  request['Referer'] = 'https://www.xiaoyuzhoufm.com/'

  response = http.request(request)

  unless response.code.to_i == 200
    raise "页面请求失败,状态码:#{response.code}(可能链接无效或被拦截)"
  end

  response.body
rescue Timeout::Error
  raise "网络请求超时(超过 30 秒),请检查网络连接"
rescue => e
  raise "网络请求异常:#{e.message}"
end

# 提取__NEXT_DATA__
def extract_next_data(html)
  match_result = html.match(NEXT_DATA_REGEX)

  unless match_result && match_result[1]
    raise "未在页面中找到 __NEXT_DATA__ 字段(可能页面结构已变更)"
  end

  match_result[1].strip
end

# 提取节目名称
def extract_episode_title(next_data_json)
  begin
    next_data = JSON.parse(next_data_json)
  rescue JSON::ParserError => e
    raise "JSON 解析失败:#{e.message}(无法提取节目名称)"
  end

  current = next_data
  TITLE_PATH.each_with_index do |key, index|
    unless current.is_a?(Hash) && current.key?(key)
      missing_path = TITLE_PATH[0..index].join(' -> ')
      raise "未找到节目名称,缺失层级:#{missing_path}"
    end
    current = current[key]
  end

  unless current.is_a?(String) && !current.strip.empty?
    raise "提取的节目名称无效:#{current.inspect}"
  end

  current.strip
end

# 提取media字段
def extract_media_field(next_data_json)
  begin
    next_data = JSON.parse(next_data_json)
  rescue JSON::ParserError => e
    raise "JSON 解析失败:#{e.message}"
  end

  current = next_data
  MEDIA_PATH.each_with_index do |key, index|
    unless current.is_a?(Hash) && current.key?(key)
      missing_path = MEDIA_PATH[0..index].join(' -> ')
      raise "未找到media字段,缺失层级:#{missing_path}"
    end
    current = current[key]
  end

  current
end

# 提取主下载地址
def extract_download_url(media_data)
  unless media_data.is_a?(Hash)
    raise "media字段格式异常(不是哈希类型)"
  end

  current = media_data
  SOURCE_URL_PATH.each_with_index do |key, index|
    unless current.is_a?(Hash) && current.key?(key)
      full_missing_path = ["media"] + SOURCE_URL_PATH[0..index]
      raise "未找到主下载地址,缺失层级:#{full_missing_path.join(' -> ')}"
    end
    current = current[key]
  end

  validate_url(current)
  current
end

# 新增:提取备用下载地址(不抛出异常,不存在则返回nil)
def extract_backup_download_url(media_data)
  return nil unless media_data.is_a?(Hash)

  current = media_data
  BACKUP_SOURCE_URL_PATH.each_with_index do |key, index|
    # 备用地址缺失时不报错,直接返回nil
    unless current.is_a?(Hash) && current.key?(key)
      puts "ℹ️ 未找到备用下载地址(缺失层级:media -> #{BACKUP_SOURCE_URL_PATH[0..index].join(' -> ')})"
      return nil
    end
    current = current[key]
  end

  begin
    validate_url(current)
    current
  rescue
    puts "ℹ️ 备用下载地址格式无效:#{current}"
    nil
  end
end

# 校验URL格式
def validate_url(url)
  return if url.is_a?(String) && !url.strip.empty?

  raise "URL格式无效(非字符串或为空):#{url.inspect}"

  uri = URI.parse(url)
  unless uri.scheme && %w[http https].include?(uri.scheme)
    raise "URL协议错误(需http/https):#{url}"
  end
rescue
  raise "URL解析失败:#{url}"
end

# 主执行流程
def main
  puts "======================================"
  puts "小宇宙播客下载脚本(支持备用地址重试)"
  puts "功能:主地址失败自动尝试备用地址 → 提取节目名称 → 下载到桌面"
  puts "主地址路径:media -> source -> url"
  puts "备用地址路径:media -> backupSource -> url"
  puts "======================================"

  begin
    target_url = get_target_url
    page_html = fetch_page_html(target_url)
    next_data_json = extract_next_data(page_html)
    episode_title = extract_episode_title(next_data_json)
    puts "\n📌 提取到节目名称:#{episode_title}"

    media_data = extract_media_field(next_data_json)
    main_url = extract_download_url(media_data)
    puts "📥 提取到主下载地址:#{main_url.split('/')[-1]}"

    # 提取备用地址(不中断流程)
    backup_url = extract_backup_download_url(media_data)
    puts "📥 提取到备用下载地址:#{backup_url.split('/')[-1]}" if backup_url

    # 处理文件名和路径
    cleaned_title = clean_filename(episode_title)
    file_ext = extract_file_extension(main_url) # 用主地址的扩展名(备用地址通常格式一致)
    desktop = desktop_path
    save_path = resolve_save_path(desktop, cleaned_title, file_ext)

    # 先尝试主地址下载
    begin
      download_with_url(main_url, save_path)
    rescue => e
      puts "⚠️ 主地址下载失败:#{e.message}"
      # 主地址失败且有备用地址时重试
      if backup_url
        puts "🔄 尝试使用备用地址下载..."
        download_with_url(backup_url, save_path)
      else
        raise "主地址下载失败,且无可用备用地址"
      end
    end

  rescue => e
    puts "\n❌ 执行失败:#{e.message}"
    exit 1
  end

  puts "\n🎉 操作完成!"
end

# 启动脚本
main

0 楼 已删除

想起我的第一个 ruby 脚本也是下载某些内容。

一眼看去,像是 AI 写的代码。

需要 登录 后方可回复, 如果你还没有账号请 注册新账号