diff --git a/video_encoder/encoder_tav.c b/video_encoder/encoder_tav.c index 136802e..328a4a4 100644 --- a/video_encoder/encoder_tav.c +++ b/video_encoder/encoder_tav.c @@ -17,6 +17,7 @@ #include #include #include +#include // C11 threads for multi-threading #include "tav_avx512.h" // AVX-512 SIMD optimisations #define ENCODER_VENDOR_STRING "Encoder-TAV 20251128 (3d-dwt,tad,ssf-tc,cdf53-motion,avx512,presets)" @@ -2007,8 +2008,123 @@ typedef struct tav_encoder_s { int two_pass_current_frame; // Current frame number in second pass char *two_pass_analysis_file; // Temporary file for storing analysis data (NULL = in-memory) + // Multi-threading support + int num_threads; // 0 or 1 = single-threaded + struct thread_pool *thread_pool; // NULL if single-threaded + } tav_encoder_t; +// ============================================================================= +// Multi-Threading Data Structures +// ============================================================================= + +// GOP slot status values +#define GOP_STATUS_EMPTY 0 // Slot is available for filling +#define GOP_STATUS_FILLING 1 // Producer is filling this slot +#define GOP_STATUS_READY 2 // Slot is ready for encoding +#define GOP_STATUS_ENCODING 3 // Worker is encoding this slot +#define GOP_STATUS_COMPLETE 4 // Encoding complete, ready for writing + +// GOP slot (circular buffer element) +typedef struct gop_slot { + // Status + volatile int status; // GOP_STATUS_* values + int gop_index; // Sequential GOP number for ordering + + // Input data (NEW: frame indices instead of pointers for complete isolation) + int frame_start_index; // Start index into global frame pool + int num_frames; // Number of frames in this GOP + int *frame_numbers; // Original frame indices (for timecodes) + float *pcm_samples; // [num_audio_samples*2] stereo L,R,L,R,... + size_t num_audio_samples; // Samples per channel + int width, height; // Frame dimensions (may be cropped) + + // Crop encoding metadata (two-pass mode) + uint16_t mask_top, mask_right, mask_bottom, mask_left; + int max_active_width, max_active_height; + int enable_crop_encoding; + + // Output data (filled by worker) + uint8_t *video_packet; // Complete video packet (header + payload) + size_t video_packet_size; // Total bytes + uint8_t **audio_packets; // [num_audio_packets][packet_size] + size_t *audio_packet_sizes; // Size of each audio packet + int num_audio_packets; // Count (1 for TAD, N for PCM8/MP2) + + // Error handling + int encoding_failed; // 1 if error occurred + char error_message[256]; // Error description + + // Synchronization + mtx_t mutex; // Protects this structure + cnd_t status_changed; // Signal when status updates +} gop_slot_t; + +// Thread-local worker context +typedef struct thread_encoder_context { + int thread_id; + struct thread_pool *pool; + + // Thread-local work buffers (reused across GOPs) + float **work_y_frames; // [max_gop_size][max_pixels] + float **work_co_frames; + float **work_cg_frames; + int16_t **quantized_y; + int16_t **quantized_co; + int16_t **quantized_cg; + uint8_t *compression_buffer; + size_t compression_buffer_size; + ZSTD_CCtx *zstd_ctx; + + // Buffer sizing + int max_gop_frames; // Maximum frames per GOP + size_t max_frame_pixels; // Maximum pixels per frame +} thread_encoder_context_t; + +// Thread pool +typedef struct thread_pool { + int num_threads; // Worker thread count + thrd_t *worker_threads; // Thread handles + thrd_t producer_thread; // Producer thread handle + thrd_t writer_thread; // Writer thread handle + + // Circular buffer of GOP slots + gop_slot_t *slots; // Array of slots + int num_slots; // N = 2 * num_threads + int slot_capacity; // Max frames per GOP + + // Global frame pool (complete frame isolation) + uint8_t **global_frame_pool; // [total_frames][width*height*3] + int total_frames; // Total frames in pool + size_t frame_rgb_size; // Bytes per frame (width*height*3) + + // Producer state + int next_gop_to_fill; // Next GOP index to produce + int total_gops_produced; // Total produced so far + int total_frames_produced; // Total frames produced + int producer_finished; // 1 when no more frames, -1 on error + + // Writer state + int next_gop_to_write; // Next GOP index to write + int total_gops_written; // Total written so far + + // Job queue for workers (indices into slots) + int *job_queue; // Circular queue of slot indices + int job_queue_head; + int job_queue_tail; + int job_queue_size; + int job_queue_capacity; + mtx_t job_queue_mutex; + cnd_t job_available; // Signal when new job available + cnd_t slot_available; // Signal when slot becomes empty + + // Shutdown signal + int shutdown; + + // Shared encoder config (read-only) + tav_encoder_t *shared_enc; +} thread_pool_t; + // Calculate maximum decomposition levels for a given frame size static int calculate_max_decomp_levels(tav_encoder_t *enc, int width, int height) { int levels = 0; @@ -2357,6 +2473,14 @@ static int process_audio_for_gop(tav_encoder_t *enc, int *frame_numbers, int num static subtitle_entry_t* parse_subtitle_file(const char *filename, int fps); static subtitle_entry_t* parse_srt_file(const char *filename, int fps); static subtitle_entry_t* parse_smi_file(const char *filename, int fps); + +// Multi-threading function prototypes +static gop_slot_t* init_gop_slots(int num_slots, int width, int height, int capacity); +static gop_slot_t* get_empty_slot(thread_pool_t *pool, int *slot_index); +static void free_gop_slot(gop_slot_t *slot); +static void destroy_gop_slots(gop_slot_t *slots, int num_slots, int capacity); +static thread_pool_t* create_thread_pool(tav_encoder_t *enc, int num_threads, int total_gops); +static void shutdown_thread_pool(thread_pool_t *pool); static int srt_time_to_frame(const char *time_str, int fps); static int sami_ms_to_frame(int milliseconds, int fps); static void free_subtitle_list(subtitle_entry_t *list); @@ -2384,6 +2508,9 @@ static void quantise_dwt_coefficients_perceptual_per_coeff(tav_encoder_t *enc, float *coeffs, int16_t *quantised, int size, int base_quantiser, int width, int height, int decomp_levels, int is_chroma, int frame_count); +static void quantise_3d_dwt_coefficients(tav_encoder_t *enc, + float **gop_coeffs, int16_t **quantised, int num_frames, + int spatial_size, int base_quantiser, int is_chroma); static size_t preprocess_coefficients_variable_layout(preprocess_mode_t preprocess_mode, int width, int height, int16_t *coeffs_y, int16_t *coeffs_co, int16_t *coeffs_cg, int16_t *coeffs_alpha, int coeff_count, int channel_layout, uint8_t *output_buffer); @@ -2436,6 +2563,7 @@ static void show_usage(const char *program_name) { printf(" --preset PRESET Encoder presets (comma-separated, e.g., 'sports,anime'):\n"); printf(" sports (or sport): Finer temporal quantisation for better motion detail\n"); printf(" anime (or animation): Disable grain synthesis for cleaner animated content\n"); + printf(" --threads N Number of worker threads for parallel GOP encoding (default: 1, requires --3d-dwt)\n"); printf(" --help Show this help\n\n"); printf("Audio Rate by Quality:\n "); @@ -2844,6 +2972,1005 @@ static int initialise_encoder(tav_encoder_t *enc) { return 0; } +// ============================================================================= +// Multi-Threading Implementation - Circular Buffer Management +// ============================================================================= + +/** + * Initialize GOP slots for circular buffer + * Allocates num_slots slots with frame buffers sized for width×height×capacity + */ +static gop_slot_t* init_gop_slots(int num_slots, int width, int height, int capacity) { + gop_slot_t *slots = calloc(num_slots, sizeof(gop_slot_t)); + if (!slots) { + fprintf(stderr, "Error: Cannot allocate %d GOP slots\n", num_slots); + return NULL; + } + + size_t frame_rgb_size = width * height * 3; + size_t max_audio_samples = capacity * 32016 * 2; // Worst case: capacity frames @ 32016 samples/frame stereo + + for (int i = 0; i < num_slots; i++) { + gop_slot_t *slot = &slots[i]; + + // Initialize status and synchronization + slot->status = GOP_STATUS_EMPTY; + slot->gop_index = -1; + mtx_init(&slot->mutex, mtx_plain); + cnd_init(&slot->status_changed); + + // NO LONGER ALLOCATE rgb_frames - use global frame pool instead + slot->frame_start_index = -1; // Will be set by producer + slot->num_frames = 0; + + // Allocate frame numbers array + slot->frame_numbers = calloc(capacity, sizeof(int)); + if (!slot->frame_numbers) { + fprintf(stderr, "Error: Cannot allocate GOP slot %d metadata\n", i); + destroy_gop_slots(slots, i, capacity); // Cleanup partial allocation + return NULL; + } + + // Allocate audio buffer + slot->pcm_samples = malloc(max_audio_samples * sizeof(float)); + if (!slot->pcm_samples) { + fprintf(stderr, "Error: Cannot allocate GOP slot %d audio buffer\n", i); + destroy_gop_slots(slots, i + 1, capacity); + return NULL; + } + + // Initialize output pointers as NULL + slot->video_packet = NULL; + slot->audio_packets = NULL; + slot->audio_packet_sizes = NULL; + slot->num_audio_packets = 0; + slot->encoding_failed = 0; + } + + // Memory calculation (excluding frame buffers which are now in global pool) + size_t total_memory = num_slots * (max_audio_samples * sizeof(float)); + printf("Allocated %d GOP slots (%.1f MB total, excluding global frame pool)\n", num_slots, total_memory / (1024.0 * 1024.0)); + + return slots; +} + +/** + * Get next empty slot from circular buffer + * Blocks if all slots are busy (producer flow control) + */ +static gop_slot_t* get_empty_slot(thread_pool_t *pool, int *slot_index) { + mtx_lock(&pool->job_queue_mutex); + + while (1) { + // Search for empty slot + for (int i = 0; i < pool->num_slots; i++) { + gop_slot_t *slot = &pool->slots[i]; + mtx_lock(&slot->mutex); + if (slot->status == GOP_STATUS_EMPTY) { + slot->status = GOP_STATUS_FILLING; + *slot_index = i; + mtx_unlock(&slot->mutex); + mtx_unlock(&pool->job_queue_mutex); + return slot; + } + mtx_unlock(&slot->mutex); + } + + // No empty slots, wait for writer to free one + cnd_wait(&pool->slot_available, &pool->job_queue_mutex); + + // Check shutdown + if (pool->shutdown) { + mtx_unlock(&pool->job_queue_mutex); + return NULL; + } + } +} + +/** + * Free GOP slot contents and mark as empty + * Called by writer after outputting GOP + */ +static void free_gop_slot(gop_slot_t *slot) { + mtx_lock(&slot->mutex); + + // Free output data + if (slot->video_packet) { + free(slot->video_packet); + slot->video_packet = NULL; + } + + if (slot->audio_packets) { + for (int i = 0; i < slot->num_audio_packets; i++) { + free(slot->audio_packets[i]); + } + free(slot->audio_packets); + slot->audio_packets = NULL; + } + + if (slot->audio_packet_sizes) { + free(slot->audio_packet_sizes); + slot->audio_packet_sizes = NULL; + } + + slot->num_audio_packets = 0; + slot->num_frames = 0; + slot->num_audio_samples = 0; + slot->gop_index = -1; + slot->encoding_failed = 0; + slot->error_message[0] = '\0'; + + // Mark as empty and signal while holding mutex + slot->status = GOP_STATUS_EMPTY; + cnd_signal(&slot->status_changed); + mtx_unlock(&slot->mutex); +} + +/** + * Destroy all GOP slots and free memory + */ +static void destroy_gop_slots(gop_slot_t *slots, int num_slots, int capacity) { + if (!slots) return; + + for (int i = 0; i < num_slots; i++) { + gop_slot_t *slot = &slots[i]; + + // NO LONGER FREE rgb_frames (using global frame pool instead) + + free(slot->frame_numbers); + free(slot->pcm_samples); + + // Free output data + free(slot->video_packet); + if (slot->audio_packets) { + for (int j = 0; j < slot->num_audio_packets; j++) { + free(slot->audio_packets[j]); + } + free(slot->audio_packets); + } + free(slot->audio_packet_sizes); + + // Destroy synchronization primitives + mtx_destroy(&slot->mutex); + cnd_destroy(&slot->status_changed); + } + + free(slots); +} + +// ============================================================================= +// Multi-Threading - Thread Functions (Forward Declarations) +// ============================================================================= + +static int worker_thread_main(void *arg); +static int producer_thread_main(void *arg); +static int writer_thread_main(void *arg); + +// ============================================================================= +// Multi-Threading - Thread Pool Lifecycle +// ============================================================================= + +/** + * Count total GOPs in the boundary linked list + */ +static int count_total_gops(gop_boundary_t *gop_boundaries) { + int count = 0; + gop_boundary_t *current = gop_boundaries; + while (current) { + count++; + current = current->next; + } + return count; +} + +/** + * Create thread pool with worker threads + * total_gops: Total number of GOPs to allocate slots for (upfront allocation) + * Returns NULL on failure + */ +static thread_pool_t* create_thread_pool(tav_encoder_t *enc, int num_threads, int total_gops) { + if (num_threads < 2) return NULL; + + thread_pool_t *pool = calloc(1, sizeof(thread_pool_t)); + if (!pool) { + fprintf(stderr, "Error: Cannot allocate thread pool\n"); + return NULL; + } + + pool->num_threads = num_threads; + pool->num_slots = total_gops; // UPFRONT ALLOCATION: one slot per GOP, no circular reuse + pool->slot_capacity = TEMPORAL_GOP_SIZE; + pool->shared_enc = enc; + pool->shutdown = 0; + pool->producer_finished = 0; + pool->next_gop_to_fill = 0; + pool->total_gops_produced = 0; + pool->total_frames_produced = 0; + pool->next_gop_to_write = 0; + pool->total_gops_written = 0; + + // Initialize job queue + pool->job_queue_capacity = pool->num_slots * 2; + pool->job_queue = calloc(pool->job_queue_capacity, sizeof(int)); + pool->job_queue_head = 0; + pool->job_queue_tail = 0; + pool->job_queue_size = 0; + + // Initialize synchronization primitives + mtx_init(&pool->job_queue_mutex, mtx_plain); + cnd_init(&pool->job_available); + cnd_init(&pool->slot_available); + + // GLOBAL FRAME POOL: Calculate total frames needed from GOP boundaries + int total_frames = 0; + gop_boundary_t *boundary = enc->gop_boundaries; + while (boundary) { + int gop_frames = (boundary->end_frame - boundary->start_frame) + 1; + total_frames += gop_frames; + boundary = boundary->next; + } + + printf("Allocating global frame pool: %d frames\n", total_frames); + + // Allocate global frame pool + pool->total_frames = total_frames; + pool->frame_rgb_size = enc->width * enc->height * 3; + pool->global_frame_pool = calloc(total_frames, sizeof(uint8_t*)); + if (!pool->global_frame_pool) { + fprintf(stderr, "Error: Failed to allocate global frame pool array\n"); + free(pool->job_queue); + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + free(pool); + return NULL; + } + + // Allocate each frame buffer + for (int i = 0; i < total_frames; i++) { + pool->global_frame_pool[i] = malloc(pool->frame_rgb_size); + if (!pool->global_frame_pool[i]) { + fprintf(stderr, "Error: Failed to allocate frame buffer %d/%d\n", i, total_frames); + // Free already allocated frames + for (int j = 0; j < i; j++) { + free(pool->global_frame_pool[j]); + } + free(pool->global_frame_pool); + free(pool->job_queue); + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + free(pool); + return NULL; + } + } + + printf("Global frame pool allocated: %zu MB\n", + (total_frames * pool->frame_rgb_size) / (1024*1024)); + + // Allocate GOP slots + pool->slots = init_gop_slots(pool->num_slots, enc->width, enc->height, pool->slot_capacity); + if (!pool->slots) { + fprintf(stderr, "Error: Failed to allocate GOP slots\n"); + free(pool->job_queue); + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + free(pool); + return NULL; + } + + // Allocate worker thread handles + pool->worker_threads = calloc(num_threads, sizeof(thrd_t)); + if (!pool->worker_threads) { + fprintf(stderr, "Error: Cannot allocate worker thread handles\n"); + destroy_gop_slots(pool->slots, pool->num_slots, pool->slot_capacity); + free(pool->job_queue); + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + free(pool); + return NULL; + } + + // Create worker threads with contexts + for (int i = 0; i < num_threads; i++) { + thread_encoder_context_t *ctx = calloc(1, sizeof(thread_encoder_context_t)); + if (!ctx) { + fprintf(stderr, "Error: Cannot allocate worker context %d\n", i); + pool->shutdown = 1; + // Wait for already-created threads to exit + for (int j = 0; j < i; j++) { + thrd_join(pool->worker_threads[j], NULL); + } + destroy_gop_slots(pool->slots, pool->num_slots, pool->slot_capacity); + free(pool->worker_threads); + free(pool->job_queue); + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + free(pool); + return NULL; + } + + ctx->thread_id = i; + ctx->pool = pool; + ctx->max_gop_frames = pool->slot_capacity; + ctx->max_frame_pixels = enc->width * enc->height; + + // Allocate thread-local work buffers + size_t total_pixels = ctx->max_gop_frames * ctx->max_frame_pixels; + ctx->work_y_frames = calloc(ctx->max_gop_frames, sizeof(float*)); + ctx->work_co_frames = calloc(ctx->max_gop_frames, sizeof(float*)); + ctx->work_cg_frames = calloc(ctx->max_gop_frames, sizeof(float*)); + ctx->quantized_y = calloc(ctx->max_gop_frames, sizeof(int16_t*)); + ctx->quantized_co = calloc(ctx->max_gop_frames, sizeof(int16_t*)); + ctx->quantized_cg = calloc(ctx->max_gop_frames, sizeof(int16_t*)); + + for (int j = 0; j < ctx->max_gop_frames; j++) { + ctx->work_y_frames[j] = malloc(ctx->max_frame_pixels * sizeof(float)); + ctx->work_co_frames[j] = malloc(ctx->max_frame_pixels * sizeof(float)); + ctx->work_cg_frames[j] = malloc(ctx->max_frame_pixels * sizeof(float)); + ctx->quantized_y[j] = malloc(ctx->max_frame_pixels * sizeof(int16_t)); + ctx->quantized_co[j] = malloc(ctx->max_frame_pixels * sizeof(int16_t)); + ctx->quantized_cg[j] = malloc(ctx->max_frame_pixels * sizeof(int16_t)); + } + + ctx->compression_buffer_size = total_pixels * 3; + ctx->compression_buffer = malloc(ctx->compression_buffer_size); + ctx->zstd_ctx = ZSTD_createCCtx(); + + // Create worker thread + if (thrd_create(&pool->worker_threads[i], worker_thread_main, ctx) != thrd_success) { + fprintf(stderr, "Error: Failed to create worker thread %d\n", i); + // Cleanup context + for (int k = 0; k < ctx->max_gop_frames; k++) { + free(ctx->work_y_frames[k]); + free(ctx->work_co_frames[k]); + free(ctx->work_cg_frames[k]); + free(ctx->quantized_y[k]); + free(ctx->quantized_co[k]); + free(ctx->quantized_cg[k]); + } + free(ctx->work_y_frames); + free(ctx->work_co_frames); + free(ctx->work_cg_frames); + free(ctx->quantized_y); + free(ctx->quantized_co); + free(ctx->quantized_cg); + free(ctx->compression_buffer); + ZSTD_freeCCtx(ctx->zstd_ctx); + free(ctx); + + pool->shutdown = 1; + for (int j = 0; j < i; j++) { + thrd_join(pool->worker_threads[j], NULL); + } + destroy_gop_slots(pool->slots, pool->num_slots, pool->slot_capacity); + free(pool->worker_threads); + free(pool->job_queue); + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + free(pool); + return NULL; + } + } + + printf("Created thread pool: %d workers, %d GOP slots\n", num_threads, pool->num_slots); + return pool; +} + +/** + * Shutdown thread pool and wait for all threads to complete + */ +static void shutdown_thread_pool(thread_pool_t *pool) { + if (!pool) return; + + // Signal shutdown + mtx_lock(&pool->job_queue_mutex); + pool->shutdown = 1; + cnd_broadcast(&pool->job_available); + cnd_broadcast(&pool->slot_available); + mtx_unlock(&pool->job_queue_mutex); + + // Wait for producer thread + if (pool->producer_finished != 0) { // Only join if started + thrd_join(pool->producer_thread, NULL); + } + + // Wait for all worker threads + for (int i = 0; i < pool->num_threads; i++) { + int result; + thrd_join(pool->worker_threads[i], &result); + } + + // Wait for writer thread + if (pool->total_gops_written > 0 || pool->total_gops_produced > 0) { // Only join if started + thrd_join(pool->writer_thread, NULL); + } + + // Destroy slots + destroy_gop_slots(pool->slots, pool->num_slots, pool->slot_capacity); + + // Free global frame pool + if (pool->global_frame_pool) { + for (int i = 0; i < pool->total_frames; i++) { + free(pool->global_frame_pool[i]); + } + free(pool->global_frame_pool); + } + + // Free job queue + free(pool->job_queue); + + // Destroy synchronization primitives + mtx_destroy(&pool->job_queue_mutex); + cnd_destroy(&pool->job_available); + cnd_destroy(&pool->slot_available); + + // Free thread handles + free(pool->worker_threads); + + free(pool); + printf("Thread pool shutdown complete\n"); +} + +// ============================================================================= +// Multi-Threading - Worker Thread Helper Functions +// ============================================================================= + +// Note: Most encoding functions are already declared in the main function prototypes section (lines 2485+) +// Forward declaration for rgb_to_colour_space_frame (defined later in the file) +static void rgb_to_colour_space_frame(tav_encoder_t *enc, const uint8_t *rgb, + float *c1, float *c2, float *c3, int width, int height); + +// ============================================================================= +// Multi-Threading - Worker Thread +// ============================================================================= + +/** + * Worker thread: Dequeue jobs and encode GOPs + */ +static int worker_thread_main(void *arg) { + thread_encoder_context_t *ctx = (thread_encoder_context_t*)arg; + thread_pool_t *pool = ctx->pool; + tav_encoder_t *enc = pool->shared_enc; + + int jobs_processed = 0; + + while (1) { + // Dequeue next job + mtx_lock(&pool->job_queue_mutex); + while (pool->job_queue_size == 0 && !pool->shutdown && pool->producer_finished == 0) { + cnd_wait(&pool->job_available, &pool->job_queue_mutex); + } + + if (pool->shutdown) { + mtx_unlock(&pool->job_queue_mutex); + break; + } + + if (pool->job_queue_size == 0 && pool->producer_finished != 0) { + // No more jobs and producer done + mtx_unlock(&pool->job_queue_mutex); + break; + } + + // Get job + int slot_idx = pool->job_queue[pool->job_queue_head]; + pool->job_queue_head = (pool->job_queue_head + 1) % pool->job_queue_capacity; + pool->job_queue_size--; + mtx_unlock(&pool->job_queue_mutex); + + gop_slot_t *slot = &pool->slots[slot_idx]; + + // Mark as encoding + mtx_lock(&slot->mutex); + slot->status = GOP_STATUS_ENCODING; + slot->encoding_failed = 0; + mtx_unlock(&slot->mutex); + + // === VIDEO ENCODING === + int num_frames = slot->num_frames; + int width = slot->width; + int height = slot->height; + int num_pixels = width * height; + + printf("worker_thread slot_idx=%d, num_frames=%d\n", slot_idx, num_frames); + + // Step 1: Convert RGB to YCoCg-R (or ICtCp) + // Access frames from global pool via slot's frame range + for (int i = 0; i < num_frames; i++) { + uint8_t *rgb_frame = pool->global_frame_pool[slot->frame_start_index + i]; + + rgb_to_colour_space_frame(enc, rgb_frame, + ctx->work_y_frames[i], ctx->work_co_frames[i], ctx->work_cg_frames[i], + width, height); + } + + // Step 2: Apply DWT + if (num_frames == 1) { + // Single-frame: 2D DWT only + dwt_2d_forward_flexible(enc, ctx->work_y_frames[0], width, height, + enc->decomp_levels, enc->wavelet_filter); + dwt_2d_forward_flexible(enc, ctx->work_co_frames[0], width, height, + enc->decomp_levels, enc->wavelet_filter); + dwt_2d_forward_flexible(enc, ctx->work_cg_frames[0], width, height, + enc->decomp_levels, enc->wavelet_filter); + } else { + // Multi-frame: 3D DWT (call for each channel separately) + dwt_3d_forward(enc, ctx->work_y_frames, width, height, num_frames, + enc->decomp_levels, enc->temporal_decomp_levels, enc->wavelet_filter); + dwt_3d_forward(enc, ctx->work_co_frames, width, height, num_frames, + enc->decomp_levels, enc->temporal_decomp_levels, enc->wavelet_filter); + dwt_3d_forward(enc, ctx->work_cg_frames, width, height, num_frames, + enc->decomp_levels, enc->temporal_decomp_levels, enc->wavelet_filter); + } + + // Step 3: Quantize coefficients (using 3D DWT quantization for GOP) + // Use channel-specific quantisers from encoder settings + // Apply QLUT mapping to chroma quantisers (matches single-threaded path) + int base_quantiser_y = enc->quantiser_y; + int base_quantiser_co = QLUT[enc->quantiser_co]; // Co quantiser from encoder (via QLUT) + int base_quantiser_cg = QLUT[enc->quantiser_cg]; // Cg quantiser from encoder (via QLUT) + + // Quantise 3D DWT coefficients with temporal-spatial quantisation + // This applies temporal scaling based on subband level and spatial perceptual weighting + quantise_3d_dwt_coefficients(enc, ctx->work_y_frames, ctx->quantized_y, num_frames, + num_pixels, base_quantiser_y, 0); // Luma + quantise_3d_dwt_coefficients(enc, ctx->work_co_frames, ctx->quantized_co, num_frames, + num_pixels, base_quantiser_co, 1); // Chroma Co + quantise_3d_dwt_coefficients(enc, ctx->work_cg_frames, ctx->quantized_cg, num_frames, + num_pixels, base_quantiser_cg, 1); // Chroma Cg + + // Step 4: EZBC preprocessing + // Allocate preprocessed buffer (max size estimation) + size_t max_preprocessed_size = num_pixels * num_frames * 3 * sizeof(int16_t); + uint8_t *preprocessed_buffer = malloc(max_preprocessed_size); + + size_t preprocessed_size = preprocess_gop_unified( + enc->preprocess_mode, // Use encoder's preprocess mode (EZBC by default) + ctx->quantized_y, ctx->quantized_co, ctx->quantized_cg, + num_frames, num_pixels, width, height, + CHANNEL_LAYOUT_YCOCG, // Standard YCoCg layout + preprocessed_buffer + ); + + // Step 5: Zstd compress + size_t compressed_bound = ZSTD_compressBound(preprocessed_size); + if (compressed_bound > ctx->compression_buffer_size) { + ctx->compression_buffer_size = compressed_bound * 2; + ctx->compression_buffer = realloc(ctx->compression_buffer, ctx->compression_buffer_size); + } + + size_t compressed_size = ZSTD_compressCCtx(ctx->zstd_ctx, + ctx->compression_buffer, compressed_bound, + preprocessed_buffer, preprocessed_size, + enc->zstd_level); + + free(preprocessed_buffer); + + if (ZSTD_isError(compressed_size)) { + mtx_lock(&slot->mutex); + slot->encoding_failed = 1; + snprintf(slot->error_message, 256, "Zstd compression failed"); + slot->status = GOP_STATUS_COMPLETE; + cnd_signal(&slot->status_changed); + mtx_unlock(&slot->mutex); + continue; + } + + // Step 6: Format video packet + uint8_t packet_type = (num_frames == 1) ? TAV_PACKET_IFRAME : TAV_PACKET_GOP_UNIFIED; + + if (num_frames == 1) { + // I-frame packet: [type(1)][size(4)][data(N)] + size_t packet_size = 1 + 4 + compressed_size; + slot->video_packet = malloc(packet_size); + slot->video_packet[0] = packet_type; + uint32_t size_field = (uint32_t)compressed_size; + memcpy(slot->video_packet + 1, &size_field, 4); + memcpy(slot->video_packet + 5, ctx->compression_buffer, compressed_size); + slot->video_packet_size = packet_size; + } else { + // GOP packet: [type(1)][gop_size(1)][size(4)][data(N)] + size_t packet_size = 1 + 1 + 4 + compressed_size; + slot->video_packet = malloc(packet_size); + slot->video_packet[0] = packet_type; + slot->video_packet[1] = (uint8_t)num_frames; // GOP size + uint32_t size_field = (uint32_t)compressed_size; + memcpy(slot->video_packet + 2, &size_field, 4); + memcpy(slot->video_packet + 6, ctx->compression_buffer, compressed_size); + slot->video_packet_size = packet_size; + } + + // === AUDIO ENCODING === + if (enc->tad_audio && slot->num_audio_samples > 0) { + // TAD encoding + int max_index = tad32_quality_to_max_index(enc->quality_level); + size_t tad_size = tad32_encode_chunk( + slot->pcm_samples, + slot->num_audio_samples, + max_index, + 1.0f, + ctx->compression_buffer + ); + + // Parse TAD chunk format: [sample_count(2)][quant_index(1)][payload_size(4)][payload(N)] + uint8_t *read_ptr = ctx->compression_buffer; + uint16_t sample_count = *((uint16_t*)read_ptr); + read_ptr += sizeof(uint16_t); + uint8_t quant_size = *((uint8_t*)read_ptr); + read_ptr += sizeof(uint8_t); + uint32_t tad_payload_size = *((uint32_t*)read_ptr); + read_ptr += sizeof(uint32_t); + uint8_t *tad_payload = read_ptr; + + // Format TAV packet 0x24: [0x24][sample_count(2)][payload_size+7(4)][sample_count(2)][quant_index(1)][compressed_size(4)][compressed_data(N)] + slot->num_audio_packets = 1; + slot->audio_packets = malloc(sizeof(uint8_t*)); + slot->audio_packet_sizes = malloc(sizeof(size_t)); + + uint32_t tav_payload_size_plus_7 = tad_payload_size + 7; + size_t audio_packet_size = 1 + 2 + 4 + 2 + 1 + 4 + tad_payload_size; + slot->audio_packets[0] = malloc(audio_packet_size); + + uint8_t *write_ptr = slot->audio_packets[0]; + *write_ptr++ = TAV_PACKET_AUDIO_TAD; + memcpy(write_ptr, &sample_count, sizeof(uint16_t)); write_ptr += 2; + memcpy(write_ptr, &tav_payload_size_plus_7, sizeof(uint32_t)); write_ptr += 4; + memcpy(write_ptr, &sample_count, sizeof(uint16_t)); write_ptr += 2; + memcpy(write_ptr, &quant_size, sizeof(uint8_t)); write_ptr += 1; + memcpy(write_ptr, &tad_payload_size, sizeof(uint32_t)); write_ptr += 4; + memcpy(write_ptr, tad_payload, tad_payload_size); + + slot->audio_packet_sizes[0] = audio_packet_size; + + } else if (enc->pcm8_audio && slot->num_audio_samples > 0) { + // PCM8 encoding (simplified - single packet) + size_t pcm8_samples = slot->num_audio_samples * 2; // Stereo + uint8_t *pcm8_data = malloc(pcm8_samples); + + // Convert PCM32f to PCM8 (simplified - no dithering) + for (size_t i = 0; i < pcm8_samples; i++) { + int16_t sample = (int16_t)(slot->pcm_samples[i] * 127.0f); + pcm8_data[i] = (uint8_t)((sample >> 8) + 128); + } + + slot->num_audio_packets = 1; + slot->audio_packets = malloc(sizeof(uint8_t*)); + slot->audio_packet_sizes = malloc(sizeof(size_t)); + + size_t audio_packet_size = 1 + 4 + pcm8_samples; + slot->audio_packets[0] = malloc(audio_packet_size); + slot->audio_packets[0][0] = TAV_PACKET_AUDIO_PCM8; + uint32_t pcm_size = (uint32_t)pcm8_samples; + memcpy(slot->audio_packets[0] + 1, &pcm_size, 4); + memcpy(slot->audio_packets[0] + 5, pcm8_data, pcm8_samples); + slot->audio_packet_sizes[0] = audio_packet_size; + + free(pcm8_data); + } else { + slot->num_audio_packets = 0; + } + + // Mark complete and signal while holding mutex to prevent lost wakeups + mtx_lock(&slot->mutex); + slot->status = GOP_STATUS_COMPLETE; + cnd_signal(&slot->status_changed); + mtx_unlock(&slot->mutex); + + jobs_processed++; + + if (enc->verbose && jobs_processed % 10 == 0) { + printf("Worker %d: Encoded GOP %d (%d frames, %zu KB)\n", + ctx->thread_id, slot->gop_index, num_frames, + slot->video_packet_size / 1024); + } + } + + // Cleanup context + for (int i = 0; i < ctx->max_gop_frames; i++) { + free(ctx->work_y_frames[i]); + free(ctx->work_co_frames[i]); + free(ctx->work_cg_frames[i]); + free(ctx->quantized_y[i]); + free(ctx->quantized_co[i]); + free(ctx->quantized_cg[i]); + } + free(ctx->work_y_frames); + free(ctx->work_co_frames); + free(ctx->work_cg_frames); + // Save thread_id before freeing context + int thread_id = ctx->thread_id; + + free(ctx->quantized_y); + free(ctx->quantized_co); + free(ctx->quantized_cg); + free(ctx->compression_buffer); + ZSTD_freeCCtx(ctx->zstd_ctx); + free(ctx); + + printf("Worker %d complete: %d GOPs encoded\n", thread_id, jobs_processed); + return 0; +} + +// ============================================================================= +// Multi-Threading - Producer Thread +// ============================================================================= + +/** + * Producer thread: Read frames from FFmpeg and fill GOP slots + */ +/** + * Producer thread: Read frames from FFmpeg and fill GOP slots according to GOP boundaries + * UPFRONT ALLOCATION VERSION: Uses slot index = GOP index, no circular buffer reuse + */ +static int producer_thread_main(void *arg) { + thread_pool_t *pool = (thread_pool_t*)arg; + tav_encoder_t *enc = pool->shared_enc; + + printf("Producer thread starting (global frame pool mode)\n"); + printf(" Total GOP slots: %d\n", pool->num_slots); + printf(" Total frames to read: %d\n", pool->total_frames); + + // STEP 1: Read ALL frames sequentially into global pool + printf("Reading all frames into global pool...\n"); + for (int i = 0; i < pool->total_frames; i++) { + size_t bytes_read = fread(pool->global_frame_pool[i], 1, + pool->frame_rgb_size, enc->ffmpeg_video_pipe); + + if (bytes_read != pool->frame_rgb_size) { + if (feof(enc->ffmpeg_video_pipe)) { + fprintf(stderr, "WARNING: EOF at frame %d (expected %d frames)\n", + i, pool->total_frames); + pool->total_frames = i; // Adjust total + break; + } else { + fprintf(stderr, "Error: FFmpeg pipe read failed at frame %d\n", i); + mtx_lock(&pool->job_queue_mutex); + pool->producer_finished = -1; + pool->shutdown = 1; + cnd_broadcast(&pool->job_available); + mtx_unlock(&pool->job_queue_mutex); + return -1; + } + } + + if ((i + 1) % 500 == 0) { + printf(" Read %d frames...\n", i + 1); + } + } + + printf("All frames read into global pool: %d frames\n", pool->total_frames); + + // STEP 2: Assign frame ranges to GOP slots + int global_frame_idx = 0; + int gop_index = 0; + gop_boundary_t *current_gop_boundary = enc->gop_boundaries; + + while (current_gop_boundary && gop_index < pool->num_slots) { + gop_slot_t *slot = &pool->slots[gop_index]; + + int expected_frames = current_gop_boundary->num_frames; + if (expected_frames <= 0) { + expected_frames = (current_gop_boundary->end_frame - current_gop_boundary->start_frame) + 1; + } + + // Check we haven't exceeded total frames + if (global_frame_idx + expected_frames > pool->total_frames) { + fprintf(stderr, "WARNING: GOP %d would exceed total frames (%d + %d > %d)\n", + gop_index, global_frame_idx, expected_frames, pool->total_frames); + expected_frames = pool->total_frames - global_frame_idx; + if (expected_frames <= 0) break; + } + + // Initialize slot with frame range + mtx_lock(&slot->mutex); + slot->gop_index = gop_index; + slot->frame_start_index = global_frame_idx; // Start index in global pool + slot->num_frames = expected_frames; + slot->width = enc->width; + slot->height = enc->height; + slot->status = GOP_STATUS_FILLING; + + // Set frame numbers for timecodes + for (int i = 0; i < expected_frames; i++) { + slot->frame_numbers[i] = current_gop_boundary->start_frame + i; + } + mtx_unlock(&slot->mutex); + + // Read audio for this GOP + if (enc->pcm_file && (enc->tad_audio || enc->pcm8_audio)) { + size_t total_audio_samples = expected_frames * enc->samples_per_frame; + size_t audio_bytes = total_audio_samples * 2 * sizeof(float); + + size_t audio_read = fread(slot->pcm_samples, 1, audio_bytes, enc->pcm_file); + if (audio_read == audio_bytes) { + slot->num_audio_samples = total_audio_samples; + } else { + fprintf(stderr, "Warning: Incomplete audio read for GOP %d\n", gop_index); + slot->num_audio_samples = audio_read / (2 * sizeof(float)); + } + } + + // Enqueue GOP for workers + mtx_lock(&pool->job_queue_mutex); + pool->job_queue[pool->job_queue_tail] = gop_index; + pool->job_queue_tail = (pool->job_queue_tail + 1) % pool->job_queue_capacity; + pool->job_queue_size++; + pool->total_gops_produced++; + pool->total_frames_produced += expected_frames; + + // Mark slot as ready + mtx_lock(&slot->mutex); + slot->status = GOP_STATUS_READY; + mtx_unlock(&slot->mutex); + + cnd_broadcast(&pool->job_available); + mtx_unlock(&pool->job_queue_mutex); + + if ((gop_index + 1) % 50 == 0) { + printf("Producer: Assigned %d GOPs\n", gop_index + 1); + } + + // Move to next GOP + global_frame_idx += expected_frames; + current_gop_boundary = current_gop_boundary->next; + gop_index++; + } + + // Signal completion + mtx_lock(&pool->job_queue_mutex); + pool->producer_finished = 1; + cnd_broadcast(&pool->job_available); + mtx_unlock(&pool->job_queue_mutex); + + printf("Producer thread complete: %d frames read, %d GOPs assigned\n", + pool->total_frames, pool->total_gops_produced); + return 0; +} + +// ============================================================================= +// Multi-Threading - Writer Thread +// ============================================================================= + +/** + * Writer thread: Sequentially write completed GOPs to output file + */ +static int writer_thread_main(void *arg) { + thread_pool_t *pool = (thread_pool_t*)arg; + tav_encoder_t *enc = pool->shared_enc; + FILE *output = enc->output_fp; + int gop_index = 0; + + while (1) { + // Find slot for next sequential GOP + gop_slot_t *slot = NULL; + int slot_idx = -1; + for (int i = 0; i < pool->num_slots; i++) { + mtx_lock(&pool->slots[i].mutex); + if (pool->slots[i].gop_index == gop_index) { + slot = &pool->slots[i]; + slot_idx = i; + mtx_unlock(&pool->slots[i].mutex); + break; + } + mtx_unlock(&pool->slots[i].mutex); + } + + if (!slot) { + // GOP not yet produced, check if we're done + mtx_lock(&pool->job_queue_mutex); + int finished = pool->producer_finished; + int total_produced = pool->total_gops_produced; + int total_written = pool->total_gops_written; + mtx_unlock(&pool->job_queue_mutex); + + if ((finished == 1 || finished == -1) && total_written >= total_produced) { + // Producer done and all GOPs written + printf("Writer: Exiting (finished=%d, written=%d, produced=%d)\n", + finished, total_written, total_produced); + break; + } + + // Wait a bit and retry + if (gop_index % 10 == 0 || gop_index > 230) { + printf("Writer: Waiting for GOP %d (finished=%d, written=%d, produced=%d)\n", + gop_index, finished, total_written, total_produced); + } + thrd_sleep(&(struct timespec){.tv_nsec=1000000}, NULL); // 1ms + continue; + } + + // Wait for slot to complete encoding + mtx_lock(&slot->mutex); + while (slot->status != GOP_STATUS_COMPLETE) { + if (pool->shutdown) { + mtx_unlock(&slot->mutex); + return 0; + } + cnd_wait(&slot->status_changed, &slot->mutex); + } + + // Check for encoding errors + if (slot->encoding_failed) { + fprintf(stderr, "Error: GOP %d encoding failed: %s\n", + gop_index, slot->error_message); + mtx_unlock(&slot->mutex); + mtx_lock(&pool->job_queue_mutex); + pool->shutdown = 1; + mtx_unlock(&pool->job_queue_mutex); + return -1; + } + + // Write timecode packet for first frame in GOP + write_timecode_packet(output, slot->frame_numbers[0], + enc->output_fps, enc->is_ntsc_framerate); + + // Debug: Verify packet data before writing (first GOP only) + if (gop_index == 0 && enc->verbose) { + if (slot->num_audio_packets > 0) { + printf("[DEBUG] GOP 0 Audio packet 0: type=0x%02X, size=%zu, first_bytes=%02X %02X %02X %02X %02X\n", + slot->audio_packets[0][0], + slot->audio_packet_sizes[0], + slot->audio_packets[0][0], slot->audio_packets[0][1], + slot->audio_packets[0][2], slot->audio_packets[0][3], + slot->audio_packets[0][4]); + } + printf("[DEBUG] GOP 0 Video packet: type=0x%02X, size=%zu, first_bytes=%02X %02X %02X %02X %02X\n", + slot->video_packet[0], + slot->video_packet_size, + slot->video_packet[0], slot->video_packet[1], + slot->video_packet[2], slot->video_packet[3], + slot->video_packet[4]); + } + + // Write audio packets + for (int i = 0; i < slot->num_audio_packets; i++) { + fwrite(slot->audio_packets[i], 1, slot->audio_packet_sizes[i], output); + } + + // Write video packet + fwrite(slot->video_packet, 1, slot->video_packet_size, output); + + // Save values for progress reporting BEFORE freeing + int num_frames_written = slot->num_frames; + size_t video_size_written = slot->video_packet_size; + int audio_packets_written = slot->num_audio_packets; + + // Write GOP_SYNC packet + uint8_t gop_sync[2] = {TAV_PACKET_GOP_SYNC, (uint8_t)num_frames_written}; + fwrite(gop_sync, 1, 2, output); + + mtx_unlock(&slot->mutex); + + // UPFRONT ALLOCATION: Do NOT free slots - each slot is used exactly once for its GOP + // free_gop_slot(slot); // REMOVED: breaks upfront allocation model + mtx_lock(&pool->job_queue_mutex); + pool->total_gops_written++; + int gops_written = pool->total_gops_written; + // cnd_signal(&pool->slot_available); // REMOVED: not needed with upfront allocation + mtx_unlock(&pool->job_queue_mutex); + + // Progress reporting (using saved values) + if (enc->verbose) { + printf("Written GOP %d (%d frames, %zu KB video + %d audio packets)\n", + gop_index, num_frames_written, + video_size_written / 1024, + audio_packets_written); + } else if (!enc->verbose && gops_written % 50 == 0) { + printf("Progress: %d GOPs written\n", gops_written); + } + + gop_index++; + } + + printf("Writer thread complete: %d GOPs written\n", pool->total_gops_written); + return 0; +} + // ============================================================================= // DWT Implementation - 5/3 Reversible and 9/7 Irreversible Filters // ============================================================================= @@ -10784,6 +11911,7 @@ int main(int argc, char *argv[]) { {"single-pass", no_argument, 0, 1050}, // disable two-pass encoding with wavelet-based scene detection {"preset", required_argument, 0, 1051}, // Encoder presets: sports, anime (comma-separated) {"enable-crop-encoding", no_argument, 0, 1052}, // Phase 2: encode cropped active region only (experimental) + {"threads", required_argument, 0, 1060}, // Multi-threading: number of worker threads {"help", no_argument, 0, '?'}, {0, 0, 0, 0} }; @@ -11052,6 +12180,24 @@ int main(int argc, char *argv[]) { enc->enable_crop_encoding = 1; printf("Phase 2 crop encoding enabled (experimental)\n"); break; + case 1060: // --threads + enc->num_threads = atoi(optarg); + if (enc->num_threads < 1) enc->num_threads = 1; + + // Future: auto-detect with limit (currently commented out) + // if (enc->num_threads == 0) { + // #ifdef _WIN32 + // SYSTEM_INFO sysinfo; + // GetSystemInfo(&sysinfo); + // int cores = sysinfo.dwNumberOfProcessors; + // #else + // int cores = sysconf(_SC_NPROCESSORS_ONLN); + // #endif + // enc->num_threads = (cores > 8) ? 8 : cores; // Limit to 8 + // } + + printf("Multi-threading: %d threads\n", enc->num_threads); + break; case 'a': int bitrate = atoi(optarg); int valid_bitrate = validate_mp2_bitrate(bitrate); @@ -11299,12 +12445,25 @@ int main(int argc, char *argv[]) { // Two-pass mode: Run first pass for scene analysis if (enc->two_pass_mode) { + // Close the existing video pipe before first pass (it will be reopened after) + if (enc->ffmpeg_video_pipe) { + pclose(enc->ffmpeg_video_pipe); + enc->ffmpeg_video_pipe = NULL; + } + if (two_pass_first_pass(enc, enc->input_file) != 0) { fprintf(stderr, "Error: First pass failed\n"); cleanup_encoder(enc); return 1; } + // Reopen video pipe for second pass + if (start_video_conversion(enc) != 1) { + fprintf(stderr, "Error: Failed to restart video conversion for second pass\n"); + cleanup_encoder(enc); + return 1; + } + // Initialise GOP boundary iterator for second pass enc->current_gop_boundary = enc->gop_boundaries; enc->two_pass_current_frame = 0; @@ -11404,6 +12563,85 @@ int main(int argc, char *argv[]) { printf("Starting encoding...\n"); + // ========================================================================= + // MULTI-THREADED vs SINGLE-THREADED MODE DECISION + // ========================================================================= + + // Multi-threading with upfront allocation - one slot per GOP for correctness + if (enc->num_threads >= 2 && enc->enable_temporal_dwt) { + // === MULTI-THREADED MODE === + // Count total GOPs for upfront allocation + int total_gops = count_total_gops(enc->gop_boundaries); + printf("Using multi-threaded encoding: %d threads, %d GOP buffer slots (upfront allocation)\n", + enc->num_threads, total_gops); + + // Create thread pool with upfront GOP slot allocation + enc->thread_pool = create_thread_pool(enc, enc->num_threads, total_gops); + if (!enc->thread_pool) { + fprintf(stderr, "Error: Failed to create thread pool, falling back to single-threaded\n"); + enc->num_threads = 1; + goto single_threaded_mode; + } + + // Write initial timecode + write_timecode_packet(enc->output_fp, 0, enc->output_fps, enc->is_ntsc_framerate); + + // Start producer thread + if (thrd_create(&enc->thread_pool->producer_thread, producer_thread_main, + enc->thread_pool) != thrd_success) { + fprintf(stderr, "Error: Failed to create producer thread\n"); + shutdown_thread_pool(enc->thread_pool); + enc->thread_pool = NULL; + cleanup_encoder(enc); + return 1; + } + + // Start writer thread + if (thrd_create(&enc->thread_pool->writer_thread, writer_thread_main, + enc->thread_pool) != thrd_success) { + fprintf(stderr, "Error: Failed to create writer thread\n"); + mtx_lock(&enc->thread_pool->job_queue_mutex); + enc->thread_pool->shutdown = 1; + mtx_unlock(&enc->thread_pool->job_queue_mutex); + thrd_join(enc->thread_pool->producer_thread, NULL); + shutdown_thread_pool(enc->thread_pool); + enc->thread_pool = NULL; + cleanup_encoder(enc); + return 1; + } + + // Wait for writer to complete (it waits for producer and workers) + int writer_result; + thrd_join(enc->thread_pool->writer_thread, &writer_result); + + if (writer_result != 0) { + fprintf(stderr, "Error: Writer thread failed\n"); + shutdown_thread_pool(enc->thread_pool); + enc->thread_pool = NULL; + cleanup_encoder(enc); + return 1; + } + + printf("\nMulti-threaded encoding complete\n"); + printf(" Total GOPs produced: %d\n", enc->thread_pool->total_gops_produced); + printf(" Total GOPs written: %d\n", enc->thread_pool->total_gops_written); + printf(" Total frames produced: %d\n", enc->thread_pool->total_frames_produced); + + // Use total frame count from producer + int frame_count = enc->thread_pool->total_frames_produced; + + // Skip single-threaded main loop + goto encoding_complete; + + } else { + // === SINGLE-THREADED MODE === + if (enc->num_threads >= 2) { + printf("Note: Multi-threading requires --temporal-dwt mode\n"); + printf("Falling back to single-threaded encoding\n"); + } + } + +single_threaded_mode: // Main encoding loop - process frames until EOF or frame limit int frame_count = 0; int true_frame_count = 0; @@ -12144,27 +13382,29 @@ int main(int argc, char *argv[]) { } } - // Update actual frame count in encoder struct - enc->total_frames = frame_count; +encoding_complete: + // Update header with actual frame count (both single and multi-threaded paths reach here) + // Get actual frame count (from thread pool in multi-threaded mode, or from loop in single-threaded) + int actual_frame_count = (enc->thread_pool) ? enc->thread_pool->total_frames_produced : frame_count; + enc->total_frames = actual_frame_count; - // Update header with actual frame count (seek back to header position) if (enc->output_fp != stdout) { long current_pos = ftell(enc->output_fp); fseek(enc->output_fp, 14, SEEK_SET); // Offset of total_frames field in TAV header - uint32_t actual_frames = frame_count; + uint32_t actual_frames = actual_frame_count; fwrite(&actual_frames, sizeof(uint32_t), 1, enc->output_fp); fseek(enc->output_fp, current_pos, SEEK_SET); // Restore position if (enc->verbose) { - printf("Updated header with actual frame count: %d\n", frame_count); + printf("Updated header with actual frame count: %d\n", actual_frame_count); } // Update ENDT in extended header (calculate end time of video) uint64_t endt_ns; if (enc->is_ntsc_framerate) { // NTSC framerates use denominator 1001 (e.g., 24000/1001, 30000/1001, 60000/1001) - endt_ns = ((uint64_t)frame_count * 1001ULL * 1000000000ULL) / ((uint64_t)enc->output_fps * 1000ULL); + endt_ns = ((uint64_t)actual_frame_count * 1001ULL * 1000000000ULL) / ((uint64_t)enc->output_fps * 1000ULL); } else { - endt_ns = ((uint64_t)frame_count * 1000000000ULL) / (uint64_t)enc->output_fps; + endt_ns = ((uint64_t)actual_frame_count * 1000000000ULL) / (uint64_t)enc->output_fps; } fseek(enc->output_fp, enc->extended_header_offset, SEEK_SET); fwrite(&endt_ns, sizeof(uint64_t), 1, enc->output_fp); @@ -12181,7 +13421,12 @@ int main(int argc, char *argv[]) { (end_time.tv_usec - enc->start_time.tv_usec) / 1000000.0; printf("\nEncoding complete!\n"); - printf(" Frames encoded: %d\n", frame_count); + + // Frame count may be from single-threaded loop or multi-threaded mode + int total_frames = (enc->thread_pool) ? + (enc->thread_pool->total_gops_written * TEMPORAL_GOP_SIZE) : frame_count; + + printf(" Frames encoded: %d\n", total_frames); printf(" Framerate: %d\n", enc->output_fps); // Get actual output size from file position (includes all data: headers, video, audio, sync packets, etc.) @@ -12205,6 +13450,12 @@ int main(int argc, char *argv[]) { static void cleanup_encoder(tav_encoder_t *enc) { if (!enc) return; + // Clean up multi-threading resources + if (enc->thread_pool) { + shutdown_thread_pool(enc->thread_pool); + enc->thread_pool = NULL; + } + if (enc->ffmpeg_video_pipe) { pclose(enc->ffmpeg_video_pipe); }