From 50092aef604357fd1fdb85bba7413e7d12d43c87 Mon Sep 17 00:00:00 2001 From: minjaesong Date: Tue, 9 Dec 2025 10:44:32 +0900 Subject: [PATCH] tavdec: multithreaded audio decode --- video_encoder/lib/libtavenc/tav_encoder_lib.c | 3 +- video_encoder/src/decoder_tav.c | 383 +++++++++++++++++- video_encoder/src/encoder_tav.c | 9 +- 3 files changed, 374 insertions(+), 21 deletions(-) diff --git a/video_encoder/lib/libtavenc/tav_encoder_lib.c b/video_encoder/lib/libtavenc/tav_encoder_lib.c index 1a43c0d..d25b419 100644 --- a/video_encoder/lib/libtavenc/tav_encoder_lib.c +++ b/video_encoder/lib/libtavenc/tav_encoder_lib.c @@ -454,14 +454,13 @@ tav_encoder_context_t *tav_encoder_create(const tav_encoder_params_t *params) { } // Auto-select temporal wavelet if still at default (255=Haar) and temporal DWT enabled - // Logic from old encoder: use Haar for large videos, CDF 5/3 for small/low-quality videos if (ctx->enable_temporal_dwt && ctx->temporal_wavelet == 255) { int num_pixels = ctx->width * ctx->height; int use_pure_haar = 0; // Smart preset based on resolution and quality // For large videos with reasonable quality, use Haar (better compression) - // For smaller videos or low quality, use CDF 5/3 (better detail preservation) + // For smaller videos or low quality, use Haar with sports mode (better motion preservation) if ((num_pixels >= 820000 && ctx->quantiser_y <= 29) || (num_pixels >= 500000 && ctx->quantiser_y <= 14) || (num_pixels >= 340000 && ctx->quantiser_y <= 7) || diff --git a/video_encoder/src/decoder_tav.c b/video_encoder/src/decoder_tav.c index aa4caf2..8d3ad7b 100644 --- a/video_encoder/src/decoder_tav.c +++ b/video_encoder/src/decoder_tav.c @@ -81,6 +81,22 @@ typedef struct { } gop_decode_job_t; +// ============================================================================= +// Audio Decode Job Structure (for multithreading) +// ============================================================================= + +typedef struct { + long file_offset; // File position for reading + uint32_t payload_size; // Size of compressed audio data + uint16_t sample_count; // Expected sample count + uint8_t packet_type; // TAD_PACKET_AUDIO_TAD or TAD_PACKET_AUDIO_PCM8 + + // Output (decoded PCM data) + uint8_t *decoded_pcm; // Stereo PCMu8 output + size_t decoded_samples; // Actual samples decoded + volatile int status; // DECODE_SLOT_* +} audio_decode_job_t; + // ============================================================================= // TAV Header Structure (32 bytes) // ============================================================================= @@ -151,7 +167,7 @@ typedef struct { int no_audio; // Skip audio decoding int dump_packets; // Debug: dump packet info - // Threading support + // Threading support (video decoding) int num_threads; int num_slots; gop_decode_job_t *slots; @@ -166,6 +182,18 @@ typedef struct { volatile int jobs_submitted; volatile int jobs_completed; + // Audio decoding (pass 1 multithreading) + audio_decode_job_t *audio_jobs; + int audio_job_count; + int audio_job_capacity; + pthread_t *audio_worker_threads; + int num_audio_threads; + pthread_mutex_t audio_mutex; + pthread_cond_t audio_cond_job_available; + volatile int audio_threads_should_exit; + volatile int next_audio_job; // Next job for worker threads to process + volatile int next_audio_write; // Next job to write to temp file + } decoder_context_t; // ============================================================================= @@ -289,9 +317,8 @@ static int spawn_ffmpeg(decoder_context_t *ctx) { if (ctx->interlaced) { // Interlaced mode: merge separate fields into interlaced frames - // tinterlace=interleave_top combines consecutive fields into interlaced frames - // Output will be full height (header.height) at half framerate - // Field order is set to top-field-first to match encoder + // setfield=tff marks each frame as top-field, weave combines consecutive fields + // into full-height interlaced frames at half framerate if (ctx->output_raw) { // Raw video output (no compression) execl("/usr/bin/ffmpeg", "ffmpeg", @@ -304,7 +331,7 @@ static int spawn_ffmpeg(decoder_context_t *ctx) { "-ar", "32000", "-ac", "2", "-i", ctx->audio_temp_file, - "-vf", "tinterlace=interleave_top", + "-vf", "setfield=tff,weave", "-field_order", "tt", "-c:v", "rawvideo", "-pixel_format", "rgb24", @@ -326,7 +353,7 @@ static int spawn_ffmpeg(decoder_context_t *ctx) { "-ar", "32000", "-ac", "2", "-i", ctx->audio_temp_file, - "-vf", "tinterlace=interleave_top", + "-vf", "setfield=tff,weave", "-field_order", "tt", "-color_range", "2", "-c:v", "ffv1", @@ -1352,6 +1379,292 @@ static int decode_video_pass2_mt(decoder_context_t *ctx) { return 0; } +// ============================================================================= +// Multithreaded Audio Extraction (Pass 1) +// ============================================================================= + +// Audio worker thread - decodes audio packets in parallel +static void *audio_worker_thread(void *arg) { + decoder_context_t *ctx = (decoder_context_t*)arg; + FILE *input_fp = fopen(ctx->input_file, "rb"); + if (!input_fp) { + return NULL; + } + + while (1) { + pthread_mutex_lock(&ctx->audio_mutex); + + // Wait for job or exit signal + while (ctx->next_audio_job >= ctx->audio_job_count && !ctx->audio_threads_should_exit) { + pthread_cond_wait(&ctx->audio_cond_job_available, &ctx->audio_mutex); + } + + if (ctx->audio_threads_should_exit) { + pthread_mutex_unlock(&ctx->audio_mutex); + break; + } + + // Get next job + int job_idx = ctx->next_audio_job++; + pthread_mutex_unlock(&ctx->audio_mutex); + + if (job_idx >= ctx->audio_job_count) break; + + audio_decode_job_t *job = &ctx->audio_jobs[job_idx]; + job->status = DECODE_SLOT_PROCESSING; + + // Seek to packet location + fseek(input_fp, job->file_offset, SEEK_SET); + + if (job->packet_type == TAV_PACKET_AUDIO_TAD) { + // Read TAD packet data + uint8_t *tad_data = malloc(job->payload_size); + if (tad_data && fread(tad_data, 1, job->payload_size, input_fp) == job->payload_size) { + // Allocate output buffer + job->decoded_pcm = malloc(job->sample_count * 2); + if (job->decoded_pcm) { + size_t bytes_consumed = 0; + int result = tad32_decode_chunk(tad_data, job->payload_size, + job->decoded_pcm, &bytes_consumed, + &job->decoded_samples); + if (result != 0) { + free(job->decoded_pcm); + job->decoded_pcm = NULL; + job->decoded_samples = 0; + } + } + free(tad_data); + } + } else if (job->packet_type == TAV_PACKET_AUDIO_PCM8) { + // Read PCM8 data directly + job->decoded_pcm = malloc(job->payload_size); + if (job->decoded_pcm && fread(job->decoded_pcm, 1, job->payload_size, input_fp) == job->payload_size) { + job->decoded_samples = job->payload_size / 2; // Stereo + } else { + free(job->decoded_pcm); + job->decoded_pcm = NULL; + job->decoded_samples = 0; + } + } + + job->status = DECODE_SLOT_DONE; + } + + fclose(input_fp); + return NULL; +} + +// Scan file and collect all audio packet metadata +static int collect_audio_packets(decoder_context_t *ctx) { + long current_pos = ftell(ctx->input_fp); + + ctx->audio_job_capacity = 1024; + ctx->audio_jobs = malloc(ctx->audio_job_capacity * sizeof(audio_decode_job_t)); + if (!ctx->audio_jobs) return -1; + ctx->audio_job_count = 0; + + // Scan through file + while (1) { + long packet_pos = ftell(ctx->input_fp); + uint8_t packet_type; + + if (fread(&packet_type, 1, 1, ctx->input_fp) != 1) break; + + if (packet_type == TAV_PACKET_AUDIO_TAD) { + // TAD packet: [sample_count(2)][payload_size+7(4)][payload...] + uint16_t sample_count; + uint32_t payload_size_plus_7; + + if (fread(&sample_count, 2, 1, ctx->input_fp) != 1) break; + if (fread(&payload_size_plus_7, 4, 1, ctx->input_fp) != 1) break; + + // Grow array if needed + if (ctx->audio_job_count >= ctx->audio_job_capacity) { + ctx->audio_job_capacity *= 2; + ctx->audio_jobs = realloc(ctx->audio_jobs, + ctx->audio_job_capacity * sizeof(audio_decode_job_t)); + if (!ctx->audio_jobs) return -1; + } + + // Add job + audio_decode_job_t *job = &ctx->audio_jobs[ctx->audio_job_count++]; + job->file_offset = ftell(ctx->input_fp); + job->payload_size = payload_size_plus_7; + job->sample_count = sample_count; + job->packet_type = TAV_PACKET_AUDIO_TAD; + job->decoded_pcm = NULL; + job->decoded_samples = 0; + job->status = DECODE_SLOT_PENDING; + + fseek(ctx->input_fp, payload_size_plus_7, SEEK_CUR); + + } else if (packet_type == TAV_PACKET_AUDIO_PCM8) { + // PCM8 packet: [size(4)][pcm_data] + uint32_t pcm_size; + if (fread(&pcm_size, 4, 1, ctx->input_fp) != 1) break; + + // Grow array if needed + if (ctx->audio_job_count >= ctx->audio_job_capacity) { + ctx->audio_job_capacity *= 2; + ctx->audio_jobs = realloc(ctx->audio_jobs, + ctx->audio_job_capacity * sizeof(audio_decode_job_t)); + if (!ctx->audio_jobs) return -1; + } + + // Add job + audio_decode_job_t *job = &ctx->audio_jobs[ctx->audio_job_count++]; + job->file_offset = ftell(ctx->input_fp); + job->payload_size = pcm_size; + job->sample_count = pcm_size / 2; + job->packet_type = TAV_PACKET_AUDIO_PCM8; + job->decoded_pcm = NULL; + job->decoded_samples = 0; + job->status = DECODE_SLOT_PENDING; + + fseek(ctx->input_fp, pcm_size, SEEK_CUR); + + } else { + // Skip other packet types + if (packet_type == TAV_PACKET_GOP_UNIFIED) { + uint8_t gop_size; + uint32_t compressed_size; + if (fread(&gop_size, 1, 1, ctx->input_fp) != 1) break; + if (fread(&compressed_size, 4, 1, ctx->input_fp) != 1) break; + fseek(ctx->input_fp, compressed_size, SEEK_CUR); + } else if (packet_type == TAV_PACKET_IFRAME) { + uint32_t compressed_size; + if (fread(&compressed_size, 4, 1, ctx->input_fp) != 1) break; + fseek(ctx->input_fp, compressed_size, SEEK_CUR); + } else if (packet_type == TAV_PACKET_EXTENDED_HDR) { + uint16_t num_pairs; + if (fread(&num_pairs, 2, 1, ctx->input_fp) != 1) break; + for (int i = 0; i < num_pairs; i++) { + uint8_t kv_header[5]; + if (fread(kv_header, 1, 5, ctx->input_fp) != 5) break; + uint8_t value_type = kv_header[4]; + if (value_type == 0x04) { + fseek(ctx->input_fp, 8, SEEK_CUR); + } else if (value_type == 0x10) { + uint16_t length; + if (fread(&length, 2, 1, ctx->input_fp) != 1) break; + fseek(ctx->input_fp, length, SEEK_CUR); + } else if (value_type <= 0x04) { + int sizes[] = {2, 3, 4, 6, 8}; + fseek(ctx->input_fp, sizes[value_type], SEEK_CUR); + } + } + } else if (packet_type == TAV_PACKET_SCREEN_MASK) { + fseek(ctx->input_fp, 4, SEEK_CUR); + } else if (packet_type == TAV_PACKET_GOP_SYNC) { + fseek(ctx->input_fp, 1, SEEK_CUR); + } else if (packet_type == TAV_PACKET_TIMECODE) { + fseek(ctx->input_fp, 8, SEEK_CUR); + } else if (packet_type == TAV_PACKET_SYNC_NTSC || + packet_type == TAV_PACKET_SYNC) { + // No payload + } else { + // Unknown packet - try to skip by reading size + uint32_t size; + if (fread(&size, 4, 1, ctx->input_fp) != 1) break; + if (size < 1000000) { + fseek(ctx->input_fp, size, SEEK_CUR); + } else { + break; // Likely corrupt + } + } + } + } + + // Restore file position + fseek(ctx->input_fp, current_pos, SEEK_SET); + return 0; +} + +// Extract audio using multiple threads +static int extract_audio_mt(decoder_context_t *ctx) { + // Collect all audio packet metadata + if (collect_audio_packets(ctx) < 0) { + fprintf(stderr, "Error: Failed to collect audio packets\n"); + return -1; + } + + if (ctx->audio_job_count == 0) { + // No audio packets found + return 0; + } + + if (ctx->verbose) { + printf(" Found %d audio packets\n", ctx->audio_job_count); + } + + // Initialize audio threading + ctx->num_audio_threads = ctx->num_threads > 0 ? ctx->num_threads : 1; + ctx->next_audio_job = 0; + ctx->next_audio_write = 0; + ctx->audio_threads_should_exit = 0; + + pthread_mutex_init(&ctx->audio_mutex, NULL); + pthread_cond_init(&ctx->audio_cond_job_available, NULL); + + // Create worker threads + ctx->audio_worker_threads = malloc(ctx->num_audio_threads * sizeof(pthread_t)); + if (!ctx->audio_worker_threads) return -1; + + for (int i = 0; i < ctx->num_audio_threads; i++) { + if (pthread_create(&ctx->audio_worker_threads[i], NULL, + audio_worker_thread, ctx) != 0) { + fprintf(stderr, "Error: Failed to create audio worker thread %d\n", i); + return -1; + } + } + + // Signal all jobs available + pthread_mutex_lock(&ctx->audio_mutex); + pthread_cond_broadcast(&ctx->audio_cond_job_available); + pthread_mutex_unlock(&ctx->audio_mutex); + + // Write decoded audio in order + for (int i = 0; i < ctx->audio_job_count; i++) { + audio_decode_job_t *job = &ctx->audio_jobs[i]; + + // Wait for this job to complete + while (job->status != DECODE_SLOT_DONE) { + usleep(100); + } + + // Write to temp file + if (job->decoded_pcm && job->decoded_samples > 0 && ctx->audio_temp_fp) { + fwrite(job->decoded_pcm, 1, job->decoded_samples * 2, ctx->audio_temp_fp); + ctx->audio_samples_decoded += job->decoded_samples; + } + } + + // Signal threads to exit + pthread_mutex_lock(&ctx->audio_mutex); + ctx->audio_threads_should_exit = 1; + pthread_cond_broadcast(&ctx->audio_cond_job_available); + pthread_mutex_unlock(&ctx->audio_mutex); + + // Wait for threads to finish + for (int i = 0; i < ctx->num_audio_threads; i++) { + pthread_join(ctx->audio_worker_threads[i], NULL); + } + + // Cleanup + for (int i = 0; i < ctx->audio_job_count; i++) { + if (ctx->audio_jobs[i].decoded_pcm) { + free(ctx->audio_jobs[i].decoded_pcm); + } + } + free(ctx->audio_jobs); + free(ctx->audio_worker_threads); + + pthread_mutex_destroy(&ctx->audio_mutex); + pthread_cond_destroy(&ctx->audio_cond_job_available); + + return 0; +} + // ============================================================================= // Main Decoding Loop // ============================================================================= @@ -1368,11 +1681,21 @@ static int decode_video(decoder_context_t *ctx) { // Pass 1: Audio extraction if (!ctx->no_audio) { - printf("Pass 1: Extracting audio...\n"); - while (process_packet(ctx) == 0) { - // Check decode limit - if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { - break; + printf("Pass 1: Extracting audio"); + if (ctx->num_threads > 0) { + printf(" (%d threads)...\n", ctx->num_threads); + if (extract_audio_mt(ctx) < 0) { + fprintf(stderr, "Error: Multithreaded audio extraction failed\n"); + return -1; + } + } else { + printf("...\n"); + // Fallback to single-threaded + while (process_packet(ctx) == 0) { + // Check decode limit + if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { + break; + } } } @@ -1464,6 +1787,28 @@ static char *generate_output_filename(const char *input_file) { return output; } +/** + * Get number of available CPU cores. + * Returns the number of online processors, or 1 on error. + */ +static int get_available_cpus(void) { +#ifdef _SC_NPROCESSORS_ONLN + long nproc = sysconf(_SC_NPROCESSORS_ONLN); + if (nproc > 0) { + return (int)nproc; + } +#endif + return 1; // Fallback to single core +} + +/** + * Get default thread count (cap at 8) + */ +static int get_default_thread_count(void) { + int available = get_available_cpus(); + return available < 8 ? available : 8; +} + static void print_usage(const char *program) { printf("TAV Decoder - TSVM Advanced Video Codec (Reference Implementation)\n"); printf("\nUsage: %s -i input.tav [-o output.mkv] [options]\n\n", program); @@ -1490,6 +1835,9 @@ int main(int argc, char *argv[]) { decoder_context_t ctx = {0}; + // Initialize threading + ctx.num_threads = get_default_thread_count(); + // Command-line options static struct option long_options[] = { {"input", required_argument, 0, 'i'}, @@ -1516,9 +1864,16 @@ int main(int argc, char *argv[]) { case 'v': ctx.verbose = 1; break; - case 't': - ctx.num_threads = atoi(optarg); + case 't': { // --threads + int threads = atoi(optarg); + if (threads < 0) { + fprintf(stderr, "Error: Thread count must be positive\n"); + return 1; + } + // Both 0 and 1 mean single-threaded (use value 0 internally) + ctx.num_threads = (threads <= 1) ? 0 : threads; break; + } case 1001: ctx.output_raw = 1; break; @@ -1535,7 +1890,7 @@ int main(int argc, char *argv[]) { case '?': default: print_usage(argv[0]); - return (c == 'h') ? 0 : 1; + return (c == 'h' || c == '?') ? 0 : 1; } } diff --git a/video_encoder/src/encoder_tav.c b/video_encoder/src/encoder_tav.c index 0a22e2c..936d774 100644 --- a/video_encoder/src/encoder_tav.c +++ b/video_encoder/src/encoder_tav.c @@ -34,7 +34,6 @@ // Multithreading Structures // ============================================================================= -#define MAX_THREADS 16 #define GOP_SLOT_EMPTY 0 #define GOP_SLOT_READY 1 #define GOP_SLOT_ENCODING 2 @@ -263,7 +262,7 @@ static int get_available_cpus(void) { } /** - * Get default thread count: min(8, available_cpus) + * Get default thread count (cap at 8) */ static int get_default_thread_count(void) { int available = get_available_cpus(); @@ -2245,7 +2244,7 @@ int main(int argc, char *argv[]) { cli.audio_quality = -1; // Will match video quality if not specified cli.use_native_audio = 0; // TAD by default - // Initialize threading defaults: min(8, available CPUs) + // Initialize threading cli.num_threads = get_default_thread_count(); // Command-line options @@ -2438,8 +2437,8 @@ int main(int argc, char *argv[]) { break; case 't': { // --threads int threads = atoi(optarg); - if (threads < 0 || threads > MAX_THREADS) { - fprintf(stderr, "Error: Thread count must be 0-%d\n", MAX_THREADS); + if (threads < 0) { + fprintf(stderr, "Error: Thread count must be positive\n"); return 1; } // Both 0 and 1 mean single-threaded (use value 0 internally)