# Velour Phase 3: Remote Sources & Import Phase 3 extends the video library to support remote storage sources like S3, JellyFin servers, and web directories. This allows users to access and import videos from multiple locations. ## Extended Storage Architecture ### New Storage Location Types ```ruby class StorageLocation < ApplicationRecord has_many :videos, dependent: :destroy validates :name, presence: true validates :storage_type, presence: true, inclusion: { in: %w[local s3 jellyfin web] } store :configuration, accessors: [ # S3 configuration :bucket, :region, :access_key_id, :secret_access_key, :endpoint, # JellyFin configuration :server_url, :api_key, :username, # Web directory configuration :base_url, :auth_type, :username, :password, :headers ], coder: JSON # Storage-type specific validations validate :validate_s3_configuration, if: -> { s3? } validate :validate_jellyfin_configuration, if: -> { jellyfin? } validate :validate_web_configuration, if: -> { web? } enum storage_type: { local: 0, s3: 1, jellyfin: 2, web: 3 } def accessible? case storage_type when 'local' File.exist?(path) && File.readable?(path) when 's3' s3_client&.bucket(bucket)&.exists? when 'jellyfin' jellyfin_client&.ping? when 'web' web_accessible? else false end end def scanner case storage_type when 'local' LocalFileScanner.new(self) when 's3' S3Scanner.new(self) when 'jellyfin' JellyFinScanner.new(self) when 'web' WebDirectoryScanner.new(self) end end def streamer case storage_type when 'local' LocalStreamer.new(self) when 's3' S3Streamer.new(self) when 'jellyfin' JellyFinStreamer.new(self) when 'web' WebStreamer.new(self) end end private def validate_s3_configuration %w[bucket region access_key_id secret_access_key].each do |field| errors.add(:configuration, "#{field} is required for S3 storage") if send(field).blank? end end def validate_jellyfin_configuration %w[server_url api_key].each do |field| errors.add(:configuration, "#{field} is required for JellyFin storage") if send(field).blank? end end def validate_web_configuration errors.add(:configuration, "base_url is required for web storage") if base_url.blank? end end ``` ## S3 Storage Implementation ### S3 Scanner Service ```ruby class S3Scanner def initialize(storage_location) @storage_location = storage_location @client = s3_client end def scan return failure_result("S3 bucket not accessible") unless @storage_location.accessible? video_files = find_video_files_in_s3 new_videos = process_s3_files(video_files) success_result(new_videos) rescue Aws::Errors::ServiceError => e failure_result("S3 error: #{e.message}") end private def s3_client @s3_client ||= Aws::S3::Client.new( region: @storage_location.region, access_key_id: @storage_location.access_key_id, secret_access_key: @storage_location.secret_access_key, endpoint: @storage_location.endpoint # Optional for S3-compatible services ) end def find_video_files_in_s3 bucket = Aws::S3::Bucket.new(@storage_location.bucket, client: s3_client) video_extensions = %w[.mp4 .avi .mkv .mov .wmv .flv .webm .m4v] bucket.objects(prefix: "") .select { |obj| video_extensions.any? { |ext| obj.key.end_with?(ext) } } .to_a end def process_s3_files(s3_objects) new_videos = [] s3_objects.each do |s3_object| filename = File.basename(s3_object.key) next if Video.exists?(filename: filename, storage_location: @storage_location) video = Video.create!( filename: filename, storage_location: @storage_location, work: Work.find_or_create_by(title: extract_title(filename)), file_size: s3_object.size, video_metadata: { remote_url: s3_object.key, last_modified: s3_object.last_modified } ) new_videos << video VideoProcessorJob.perform_later(video.id) end new_videos end def extract_title(filename) File.basename(filename, ".*").gsub(/[\[\(].*?[\]\)]/, "").strip end def success_result(videos = []) { success: true, videos: videos, message: "Found #{videos.length} new videos in S3" } end def failure_result(message) { success: false, message: message } end end ``` ### S3 Streamer ```ruby class S3Streamer def initialize(storage_location) @storage_location = storage_location @client = s3_client end def stream(video, range: nil) s3_object = s3_object_for_video(video) if range # Handle byte-range requests for seeking range_header = "bytes=#{range}" resp = @client.get_object( bucket: @storage_location.bucket, key: video.video_metadata['remote_url'], range: range_header ) { body: resp.body, status: 206, # Partial content headers: { 'Content-Range' => "bytes #{range}/#{s3_object.size}", 'Content-Length' => resp.content_length, 'Accept-Ranges' => 'bytes', 'Content-Type' => 'video/mp4' } } else resp = @client.get_object( bucket: @storage_location.bucket, key: video.video_metadata['remote_url'] ) { body: resp.body, status: 200, headers: { 'Content-Length' => resp.content_length, 'Content-Type' => 'video/mp4' } } end end def presigned_url(video, expires_in: 1.hour) signer = Aws::S3::Presigner.new(client: @client) signer.presigned_url( :get_object, bucket: @storage_location.bucket, key: video.video_metadata['remote_url'], expires_in: expires_in.to_i ) end private def s3_client @s3_client ||= Aws::S3::Client.new( region: @storage_location.region, access_key_id: @storage_location.access_key_id, secret_access_key: @storage_location.secret_access_key, endpoint: @storage_location.endpoint ) end def s3_object_for_video(video) @client.get_object( bucket: @storage_location.bucket, key: video.video_metadata['remote_url'] ) end end ``` ## JellyFin Integration ### JellyFin Client ```ruby class JellyFinClient def initialize(server_url:, api_key:, username: nil) @server_url = server_url.chomp('/') @api_key = api_key @username = username @http = Faraday.new(url: @server_url) do |faraday| faraday.headers['X-Emby-Token'] = @api_key faraday.adapter Faraday.default_adapter end end def ping? response = @http.get('/System/Ping') response.success? rescue false end def libraries response = @http.get('/Library/VirtualFolders') JSON.parse(response.body) end def movies(library_id = nil) path = library_id ? "/Users/#{user_id}/Items?ParentId=#{library_id}&IncludeItemTypes=Movie&Recursive=true" : "/Users/#{user_id}/Items?IncludeItemTypes=Movie&Recursive=true" response = @http.get(path) JSON.parse(response.body)['Items'] end def tv_shows(library_id = nil) path = library_id ? "/Users/#{user_id}/Items?ParentId=#{library_id}&IncludeItemTypes=Series&Recursive=true" : "/Users/#{user_id}/Items?IncludeItemTypes=Series&Recursive=true" response = @http.get(path) JSON.parse(response.body)['Items'] end def episodes(show_id) response = @http.get("/Shows/#{show_id}/Episodes?UserId=#{user_id}") JSON.parse(response.body)['Items'] end def streaming_url(item_id) "#{@server_url}/Videos/#{item_id}/stream?Static=true&MediaSourceId=#{item_id}&DeviceId=Velour&api_key=#{@api_key}" end def item_details(item_id) response = @http.get("/Users/#{user_id}/Items/#{item_id}") JSON.parse(response.body) end private def user_id @user_id ||= begin response = @http.get('/Users') users = JSON.parse(response.body) if @username user = users.find { |u| u['Name'] == @username } user&.dig('Id') || users.first['Id'] else users.first['Id'] end end end end ``` ### JellyFin Scanner ```ruby class JellyFinScanner def initialize(storage_location) @storage_location = storage_location @client = jellyfin_client end def scan return failure_result("JellyFin server not accessible") unless @storage_location.accessible? movies = @client.movies shows = @client.tv_shows episodes = [] shows.each do |show| episodes.concat(@client.episodes(show['Id'])) end all_items = movies + episodes new_videos = process_jellyfin_items(all_items) success_result(new_videos) rescue => e failure_result("JellyFin error: #{e.message}") end private def jellyfin_client @client ||= JellyFinClient.new( server_url: @storage_location.server_url, api_key: @storage_location.api_key, username: @storage_location.username ) end def process_jellyfin_items(items) new_videos = [] items.each do |item| next unless item['MediaType'] == 'Video' title = item['Name'] year = item['ProductionYear'] work_title = year ? "#{title} (#{year})" : title work = Work.find_or_create_by(title: work_title) do |w| w.year = year w.description = item['Overview'] end video = Video.find_or_initialize_by( filename: item['Id'], storage_location: @storage_location ) if video.new_record? video.update!( work: work, video_metadata: { jellyfin_id: item['Id'], media_type: item['Type'], runtime: item['RunTimeTicks'] ? item['RunTimeTicks'] / 10_000_000 : nil, premiere_date: item['PremiereDate'], community_rating: item['CommunityRating'], genres: item['Genres'] } ) new_videos << video VideoProcessorJob.perform_later(video.id) end end new_videos end def success_result(videos = []) { success: true, videos: videos, message: "Found #{videos.length} new videos from JellyFin" } end def failure_result(message) { success: false, message: message } end end ``` ### JellyFin Streamer ```ruby class JellyFinStreamer def initialize(storage_location) @storage_location = storage_location @client = jellyfin_client end def stream(video, range: nil) jellyfin_id = video.video_metadata['jellyfin_id'] stream_url = @client.streaming_url(jellyfin_id) # For JellyFin, we typically proxy the stream if range proxy_stream_with_range(stream_url, range) else proxy_stream(stream_url) end end private def jellyfin_client @client ||= JellyFinClient.new( server_url: @storage_location.server_url, api_key: @storage_location.api_key, username: @storage_location.username ) end def proxy_stream(url) response = Faraday.get(url) { body: response.body, status: response.status, headers: response.headers } end def proxy_stream_with_range(url, range) response = Faraday.get(url, nil, { 'Range' => "bytes=#{range}" }) { body: response.body, status: response.status, headers: response.headers } end end ``` ## Video Import System ### Import Job with Progress Tracking ```ruby class VideoImportJob < ApplicationJob include ActiveJob::Statuses def perform(video_id, destination_storage_location_id) video = Video.find(video_id) destination = StorageLocation.find(destination_storage_location_id) progress.update(stage: "download", total: 100, current: 0) # Download file from source downloaded_file = download_video(video, destination) do |current, total| progress.update(current: (current.to_f / total * 50).to_i) # Download is 50% of progress end progress.update(stage: "process", total: 100, current: 50) # Create new video record in destination new_video = Video.create!( filename: video.filename, storage_location: destination, work: video.work, file_size: video.file_size ) # Copy file to destination destination_path = File.join(destination.path, video.filename) FileUtils.cp(downloaded_file.path, destination_path) # Process the new video VideoProcessorJob.perform_later(new_video.id) progress.update(stage: "complete", total: 100, current: 100) # Clean up temp file File.delete(downloaded_file.path) end private def download_video(video, destination, &block) case video.storage_location.storage_type when 's3' download_from_s3(video, &block) when 'jellyfin' download_from_jellyfin(video, &block) when 'web' download_from_web(video, &block) else raise "Unsupported import from #{video.storage_location.storage_type}" end end def download_from_s3(video, &block) temp_file = Tempfile.new(['video_import', File.extname(video.filename)]) s3_client = Aws::S3::Client.new( region: video.storage_location.region, access_key_id: video.storage_location.access_key_id, secret_access_key: video.storage_location.secret_access_key ) s3_client.get_object( bucket: video.storage_location.bucket, key: video.video_metadata['remote_url'], response_target: temp_file.path ) do |chunk| yield(chunk.bytes_written, chunk.size) if block_given? end temp_file end def download_from_jellyfin(video, &block) temp_file = Tempfile.new(['video_import', File.extname(video.filename)]) jellyfin_id = video.video_metadata['jellyfin_id'] client = JellyFinClient.new( server_url: video.storage_location.server_url, api_key: video.storage_location.api_key ) stream_url = client.streaming_url(jellyfin_id) # Download with progress tracking uri = URI(stream_url) Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http| request = Net::HTTP::Get.new(uri) http.request(request) do |response| total_size = response['Content-Length'].to_i downloaded = 0 response.read_body do |chunk| temp_file.write(chunk) downloaded += chunk.bytesize yield(downloaded, total_size) if block_given? end end end temp_file end end ``` ### Import UI ```erb <% if video.storage_location.remote? && current_user.admin? %>
Import "<%= video.filename %>" to a local storage location for offline access and transcoding.
Importing video...
Starting...
Import failed
${error.message}
Import complete!
Import failed
${progress.error || "Unknown error"}