From 34a1f0e3db19777d81f2c7af201c0ee6075331c4 Mon Sep 17 00:00:00 2001 From: minjaesong Date: Mon, 8 Dec 2025 11:06:03 +0900 Subject: [PATCH] tavenc: multithreaded encoding --- video_encoder/lib/libtavenc/tav_encoder_lib.c | 20 +- video_encoder/src/encoder_tav.c | 723 +++++++++++++++++- 2 files changed, 729 insertions(+), 14 deletions(-) diff --git a/video_encoder/lib/libtavenc/tav_encoder_lib.c b/video_encoder/lib/libtavenc/tav_encoder_lib.c index eacf583..84ad2ff 100644 --- a/video_encoder/lib/libtavenc/tav_encoder_lib.c +++ b/video_encoder/lib/libtavenc/tav_encoder_lib.c @@ -392,26 +392,26 @@ tav_encoder_context_t *tav_encoder_create(const tav_encoder_params_t *params) { // Auto mode: use monoblock for <= D1 PAL, tiled for larger if (ctx->width > TAV_MONOBLOCK_MAX_WIDTH || ctx->height > TAV_MONOBLOCK_MAX_HEIGHT) { ctx->monoblock = 0; -// if (ctx->verbose) { + if (ctx->verbose) { printf("Auto-selected Padded Tiling mode: %dx%d exceeds D1 PAL threshold (%dx%d)\n", ctx->width, ctx->height, TAV_MONOBLOCK_MAX_WIDTH, TAV_MONOBLOCK_MAX_HEIGHT); -// } + } } else { ctx->monoblock = 1; -// if (ctx->verbose) { + if (ctx->verbose) { printf("Auto-selected Monoblock mode: %dx%d within D1 PAL threshold\n", ctx->width, ctx->height); -// } + } } } else if (ctx->monoblock == 0) { -// if (ctx->verbose) { + if (ctx->verbose) { printf("Forced Padded Tiling mode (--tiled)\n"); -// } + } } else { // monoblock == 1: force monoblock even for large dimensions -// if (ctx->verbose) { + if (ctx->verbose) { printf("Forced Monoblock mode (--monoblock)\n"); -// } + } } // Calculate tile dimensions based on monoblock setting @@ -423,10 +423,10 @@ tav_encoder_context_t *tav_encoder_create(const tav_encoder_params_t *params) { // Padded Tiling mode: multiple tiles of TILE_SIZE_X × TILE_SIZE_Y ctx->tiles_x = (ctx->width + TAV_TILE_SIZE_X - 1) / TAV_TILE_SIZE_X; ctx->tiles_y = (ctx->height + TAV_TILE_SIZE_Y - 1) / TAV_TILE_SIZE_Y; -// if (ctx->verbose) { + if (ctx->verbose) { printf("Padded Tiling mode: %dx%d tiles (%d total)\n", ctx->tiles_x, ctx->tiles_y, ctx->tiles_x * ctx->tiles_y); -// } + } } // Calculate decomp levels if auto (0) diff --git a/video_encoder/src/encoder_tav.c b/video_encoder/src/encoder_tav.c index 9b1c5c6..498ec15 100644 --- a/video_encoder/src/encoder_tav.c +++ b/video_encoder/src/encoder_tav.c @@ -25,10 +25,43 @@ #include #include #include +#include #include "tav_encoder_lib.h" #include "encoder_tad.h" +// ============================================================================= +// Multithreading Structures +// ============================================================================= + +#define MAX_THREADS 16 +#define GOP_SLOT_EMPTY 0 +#define GOP_SLOT_READY 1 +#define GOP_SLOT_ENCODING 2 +#define GOP_SLOT_COMPLETE 3 + +typedef struct gop_job { + // Slot state + volatile int status; + + // Input data (owned by job) + uint8_t **rgb_frames; // Array of frame pointers [num_frames] + int num_frames; // Frames in this GOP + int *frame_numbers; // Frame indices for timecodes + int gop_index; // Sequential GOP number + + // Audio data (owned by job) + float *audio_samples; // Stereo PCM32f for this GOP + size_t num_audio_samples; // Samples per channel + + // Output data (filled by worker, owned by job) + tav_encoder_packet_t *packet; // Encoded video packet + int success; // 1 if encoding succeeded + + // Encoder params (copy for thread safety) + tav_encoder_params_t params; +} gop_job_t; + // ============================================================================= // Constants and Globals // ============================================================================= @@ -119,6 +152,15 @@ typedef struct { // Subtitle processing subtitle_entry_t *subtitles; + // Multithreading + int num_threads; // 0 = single-threaded, 1+ = num worker threads + gop_job_t *gop_jobs; // Array of GOP job slots [num_threads] + pthread_t *worker_threads; // Array of worker thread handles [num_threads] + pthread_mutex_t job_mutex; // Mutex for job slot access + pthread_cond_t job_ready; // Signal when a job slot is ready for encoding + pthread_cond_t job_complete; // Signal when a job slot is complete + volatile int shutdown_workers; // 1 when workers should exit + } cli_context_t; // ============================================================================= @@ -166,6 +208,9 @@ static void print_usage(const char *program) { printf(" --intra-only Disable temporal compression (I-frames only)\n"); printf(" --gop-size N GOP size 8/16/24 (default: 24)\n"); printf(" --single-pass Disable scene change detection\n"); + printf("\nPerformance:\n"); + printf(" -t, --threads N Parallel encoding threads (0=single-threaded, default: 0)\n"); + printf(" Each thread encodes one GOP independently\n"); // printf("\nTiling:\n"); // printf(" --monoblock Force single-tile mode (auto-disabled for > %dx%d)\n", // TAV_MONOBLOCK_MAX_WIDTH, TAV_MONOBLOCK_MAX_HEIGHT); @@ -200,7 +245,9 @@ static void print_usage(const char *program) { printf(" # Sports mode with larger GOP\n"); printf(" %s -i video.mp4 -o out.tav --preset-sports --gop-size 24\n\n", program); printf(" # Advanced: separate quality per channel\n"); - printf(" %s -i video.mp4 -o out.tav --quality-y 5 --quality-co 4 --quality-cg 3\n", program); + printf(" %s -i video.mp4 -o out.tav --quality-y 5 --quality-co 4 --quality-cg 3\n\n", program); + printf(" # Multithreaded encoding with 4 threads\n"); + printf(" %s -i video.mp4 -o out.tav -t 4 -q 3\n", program); } // ============================================================================= @@ -924,10 +971,664 @@ static int write_fontrom_packet(FILE *fp, const char *filename, uint8_t opcode, } // ============================================================================= -// Main Encoding Loop +// Worker Thread Functions +// ============================================================================= + +/** + * Worker thread context - passed to worker_thread_main. + */ +typedef struct { + cli_context_t *cli; + int thread_id; +} worker_context_t; + +/** + * Worker thread main function. + * Continuously picks up jobs from the job pool and encodes them. + */ +static void *worker_thread_main(void *arg) { + worker_context_t *wctx = (worker_context_t *)arg; + cli_context_t *cli = wctx->cli; + (void)wctx->thread_id; // Unused but kept for debugging + + while (1) { + pthread_mutex_lock(&cli->job_mutex); + + // Wait for a job or shutdown signal + while (!cli->shutdown_workers) { + // Look for a job slot that is ready to encode + int found_job = -1; + for (int i = 0; i < cli->num_threads; i++) { + if (cli->gop_jobs[i].status == GOP_SLOT_READY) { + cli->gop_jobs[i].status = GOP_SLOT_ENCODING; + found_job = i; + break; + } + } + + if (found_job >= 0) { + pthread_mutex_unlock(&cli->job_mutex); + + // Encode this GOP + gop_job_t *job = &cli->gop_jobs[found_job]; + + // Create thread-local encoder context + tav_encoder_context_t *ctx = tav_encoder_create(&job->params); + if (!ctx) { + fprintf(stderr, "Failed to create encoder for GOP %d\n", job->gop_index); + job->success = 0; + } else { + // Encode GOP + int result = tav_encoder_encode_gop(ctx, + (const uint8_t **)job->rgb_frames, + job->num_frames, + job->frame_numbers, + &job->packet); + job->success = (result == 1 && job->packet != NULL); + tav_encoder_free(ctx); + } + + // Mark job as complete (reacquire lock for next iteration) + pthread_mutex_lock(&cli->job_mutex); + job->status = GOP_SLOT_COMPLETE; + pthread_cond_broadcast(&cli->job_complete); + // Keep lock held for next iteration of inner while loop + continue; // Look for more jobs + } + + // No job found, wait for signal + pthread_cond_wait(&cli->job_ready, &cli->job_mutex); + } + + pthread_mutex_unlock(&cli->job_mutex); + break; // Shutdown + } + + free(wctx); + return NULL; +} + +/** + * Initialize multithreading resources. + * Returns 0 on success, -1 on failure. + */ +static int init_threading(cli_context_t *cli) { + if (cli->num_threads <= 0) { + return 0; // Single-threaded mode + } + + // Initialize mutex and condition variables + if (pthread_mutex_init(&cli->job_mutex, NULL) != 0) { + fprintf(stderr, "Error: Failed to initialize job mutex\n"); + return -1; + } + if (pthread_cond_init(&cli->job_ready, NULL) != 0) { + fprintf(stderr, "Error: Failed to initialize job_ready cond\n"); + pthread_mutex_destroy(&cli->job_mutex); + return -1; + } + if (pthread_cond_init(&cli->job_complete, NULL) != 0) { + fprintf(stderr, "Error: Failed to initialize job_complete cond\n"); + pthread_cond_destroy(&cli->job_ready); + pthread_mutex_destroy(&cli->job_mutex); + return -1; + } + + // Allocate job slots (one per thread) + cli->gop_jobs = calloc(cli->num_threads, sizeof(gop_job_t)); + if (!cli->gop_jobs) { + fprintf(stderr, "Error: Failed to allocate job slots\n"); + pthread_cond_destroy(&cli->job_complete); + pthread_cond_destroy(&cli->job_ready); + pthread_mutex_destroy(&cli->job_mutex); + return -1; + } + + // Allocate worker thread handles + cli->worker_threads = malloc(cli->num_threads * sizeof(pthread_t)); + if (!cli->worker_threads) { + fprintf(stderr, "Error: Failed to allocate thread handles\n"); + free(cli->gop_jobs); + pthread_cond_destroy(&cli->job_complete); + pthread_cond_destroy(&cli->job_ready); + pthread_mutex_destroy(&cli->job_mutex); + return -1; + } + + // Start worker threads + cli->shutdown_workers = 0; + for (int i = 0; i < cli->num_threads; i++) { + worker_context_t *wctx = malloc(sizeof(worker_context_t)); + if (!wctx) { + fprintf(stderr, "Error: Failed to allocate worker context\n"); + cli->shutdown_workers = 1; + pthread_cond_broadcast(&cli->job_ready); + for (int j = 0; j < i; j++) { + pthread_join(cli->worker_threads[j], NULL); + } + free(cli->worker_threads); + free(cli->gop_jobs); + pthread_cond_destroy(&cli->job_complete); + pthread_cond_destroy(&cli->job_ready); + pthread_mutex_destroy(&cli->job_mutex); + return -1; + } + wctx->cli = cli; + wctx->thread_id = i; + + if (pthread_create(&cli->worker_threads[i], NULL, worker_thread_main, wctx) != 0) { + fprintf(stderr, "Error: Failed to create worker thread %d\n", i); + free(wctx); + cli->shutdown_workers = 1; + pthread_cond_broadcast(&cli->job_ready); + for (int j = 0; j < i; j++) { + pthread_join(cli->worker_threads[j], NULL); + } + free(cli->worker_threads); + free(cli->gop_jobs); + pthread_cond_destroy(&cli->job_complete); + pthread_cond_destroy(&cli->job_ready); + pthread_mutex_destroy(&cli->job_mutex); + return -1; + } + } + + printf("Started %d worker threads for parallel GOP encoding\n", cli->num_threads); + return 0; +} + +/** + * Shutdown multithreading resources. + */ +static void shutdown_threading(cli_context_t *cli) { + if (cli->num_threads <= 0) { + return; + } + + // Signal workers to shutdown + pthread_mutex_lock(&cli->job_mutex); + cli->shutdown_workers = 1; + pthread_cond_broadcast(&cli->job_ready); + pthread_mutex_unlock(&cli->job_mutex); + + // Wait for all workers to finish + for (int i = 0; i < cli->num_threads; i++) { + pthread_join(cli->worker_threads[i], NULL); + } + + // Free job slots (and any remaining resources) + if (cli->gop_jobs) { + for (int i = 0; i < cli->num_threads; i++) { + if (cli->gop_jobs[i].packet) { + tav_encoder_free_packet(cli->gop_jobs[i].packet); + } + // Note: rgb_frames should already be freed by now + } + free(cli->gop_jobs); + cli->gop_jobs = NULL; + } + + if (cli->worker_threads) { + free(cli->worker_threads); + cli->worker_threads = NULL; + } + + pthread_cond_destroy(&cli->job_complete); + pthread_cond_destroy(&cli->job_ready); + pthread_mutex_destroy(&cli->job_mutex); +} + +// ============================================================================= +// Multithreaded Encoding Loop +// ============================================================================= + +/** + * Multithreaded video encoding function. + * Uses worker threads to encode GOPs in parallel. + */ +static int encode_video_mt(cli_context_t *cli) { + printf("Opening FFmpeg pipe...\n"); + cli->ffmpeg_pipe = open_ffmpeg_pipe(cli->input_file, + cli->enc_params.width, + cli->enc_params.height); + if (!cli->ffmpeg_pipe) { + return -1; + } + + // Create temporary encoder to get calculated params (decomp_levels, etc.) + printf("Creating encoder context...\n"); + tav_encoder_context_t *ctx = tav_encoder_create(&cli->enc_params); + if (!ctx) { + fprintf(stderr, "Error: %s\n", "Failed to create encoder"); + pclose(cli->ffmpeg_pipe); + return -1; + } + tav_encoder_get_params(ctx, &cli->enc_params); + tav_encoder_free(ctx); + ctx = NULL; + + // Initialize threading + if (init_threading(cli) < 0) { + pclose(cli->ffmpeg_pipe); + return -1; + } + + // Allocate per-job frame buffers + size_t frame_size = cli->enc_params.width * cli->enc_params.height * 3; + int gop_size = cli->enc_params.gop_size; + if (!cli->enc_params.enable_temporal_dwt) { + gop_size = 1; + } + + // Allocate frame buffers for each job slot + for (int slot = 0; slot < cli->num_threads; slot++) { + cli->gop_jobs[slot].rgb_frames = malloc(gop_size * sizeof(uint8_t*)); + cli->gop_jobs[slot].frame_numbers = malloc(gop_size * sizeof(int)); + if (!cli->gop_jobs[slot].rgb_frames || !cli->gop_jobs[slot].frame_numbers) { + fprintf(stderr, "Error: Failed to allocate job slot %d buffers\n", slot); + shutdown_threading(cli); + pclose(cli->ffmpeg_pipe); + return -1; + } + for (int f = 0; f < gop_size; f++) { + cli->gop_jobs[slot].rgb_frames[f] = malloc(frame_size); + if (!cli->gop_jobs[slot].rgb_frames[f]) { + fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d\n", slot); + shutdown_threading(cli); + pclose(cli->ffmpeg_pipe); + return -1; + } + } + // Copy encoder params for thread safety + cli->gop_jobs[slot].params = cli->enc_params; + cli->gop_jobs[slot].status = GOP_SLOT_EMPTY; + cli->gop_jobs[slot].num_frames = 0; + } + + // Allocate audio buffers if needed + if (cli->has_audio) { + size_t max_gop_audio = gop_size * cli->samples_per_frame * 2; + cli->gop_audio_buffer = malloc(max_gop_audio * sizeof(float)); + cli->gop_audio_samples = 0; + if (!cli->gop_audio_buffer) { + fprintf(stderr, "Error: Failed to allocate GOP audio buffer\n"); + shutdown_threading(cli); + pclose(cli->ffmpeg_pipe); + return -1; + } + + // Allocate per-job audio buffers + for (int slot = 0; slot < cli->num_threads; slot++) { + cli->gop_jobs[slot].audio_samples = malloc(max_gop_audio * sizeof(float)); + if (!cli->gop_jobs[slot].audio_samples) { + fprintf(stderr, "Error: Failed to allocate audio buffer for slot %d\n", slot); + shutdown_threading(cli); + pclose(cli->ffmpeg_pipe); + return -1; + } + } + } + + // Temporary frame buffer for reading + uint8_t *rgb_frame = malloc(frame_size); + if (!rgb_frame) { + fprintf(stderr, "Error: Failed to allocate frame buffer\n"); + shutdown_threading(cli); + pclose(cli->ffmpeg_pipe); + return -1; + } + + // Write TAV header + write_tav_header(cli->output_fp, &cli->enc_params, cli->has_audio, cli->subtitles != NULL); + + // Write subtitles upfront + if (cli->subtitles) { + printf("Writing subtitles...\n"); + write_all_subtitles(cli->output_fp, cli->subtitles, cli->verbose); + } + + // Write font ROMs if provided + if (cli->fontrom_low) { + printf("Uploading low font ROM...\n"); + write_fontrom_packet(cli->output_fp, cli->fontrom_low, FONTROM_OPCODE_LOW, cli->verbose); + } + if (cli->fontrom_high) { + printf("Uploading high font ROM...\n"); + write_fontrom_packet(cli->output_fp, cli->fontrom_high, FONTROM_OPCODE_HIGH, cli->verbose); + } + + printf("Encoding frames with %d threads...\n", cli->num_threads); + cli->start_time = time(NULL); + + int current_slot = 0; // Slot being filled + int next_gop_to_write = 0; // GOP index that should be written next + int current_gop_index = 0; // Current GOP index being assembled + int frames_in_current_gop = 0; // Frames accumulated in current slot + int encoding_error = 0; + int eof_reached = 0; + + while (!encoding_error) { + // Step 1: Try to write any completed GOPs in order + pthread_mutex_lock(&cli->job_mutex); + while (!encoding_error) { + // Find the slot with the next GOP to write + int found = -1; + for (int i = 0; i < cli->num_threads; i++) { + if (cli->gop_jobs[i].status == GOP_SLOT_COMPLETE && + cli->gop_jobs[i].gop_index == next_gop_to_write) { + found = i; + break; + } + } + + if (found < 0) break; // No complete GOP ready to write + + gop_job_t *job = &cli->gop_jobs[found]; + pthread_mutex_unlock(&cli->job_mutex); + + // Write this GOP + if (job->success && job->packet) { + // Write TIMECODE + write_timecode_packet(cli->output_fp, job->frame_numbers[0], + cli->enc_params.fps_num, cli->enc_params.fps_den); + + // Write AUDIO for this GOP + if (cli->has_audio && job->num_audio_samples > 0) { + write_audio_packet(cli->output_fp, cli, job->audio_samples, job->num_audio_samples); + } + + // Write VIDEO packet + write_tav_packet(cli->output_fp, job->packet); + cli->total_bytes += job->packet->size; + cli->gop_count++; + + // Write GOP_SYNC + if (job->packet->packet_type == TAV_PACKET_GOP_UNIFIED) { + int frames_in_gop = job->packet->data[1]; + write_gop_sync_packet(cli->output_fp, frames_in_gop); + } else if (job->packet->packet_type == TAV_PACKET_IFRAME) { + write_gop_sync_packet(cli->output_fp, 1); + } + + tav_encoder_free_packet(job->packet); + job->packet = NULL; + } else { + fprintf(stderr, "Error: GOP %d encoding failed\n", job->gop_index); + encoding_error = 1; + } + + // Mark slot as empty + pthread_mutex_lock(&cli->job_mutex); + job->status = GOP_SLOT_EMPTY; + job->num_frames = 0; + next_gop_to_write++; + + // Progress + if (cli->verbose || cli->frame_count % 60 == 0) { + time_t elapsed = time(NULL) - cli->start_time; + double fps = elapsed > 0 ? (double)cli->frame_count / elapsed : 0.0; + double bitrate = elapsed > 0 ? + (cli->total_bytes * 8.0) / (cli->frame_count / ((double)cli->enc_params.fps_num / cli->enc_params.fps_den)) / 1000.0 : 0.0; + + printf("\rFrame %ld | GOPs: %ld | %.1f fps | %.1f kbps | %zu KB ", + cli->frame_count, cli->gop_count, fps, bitrate, + cli->total_bytes / 1024); + fflush(stdout); + } + } + pthread_mutex_unlock(&cli->job_mutex); + + if (encoding_error || eof_reached) break; + + // Step 2: Fill current GOP slot + gop_job_t *slot = &cli->gop_jobs[current_slot]; + + // Wait for slot to be empty (writing completed GOPs along the way) + pthread_mutex_lock(&cli->job_mutex); + while (slot->status != GOP_SLOT_EMPTY && !cli->shutdown_workers) { + // While waiting, check if we can write any completed GOPs + int wrote_something = 0; + for (int i = 0; i < cli->num_threads; i++) { + if (cli->gop_jobs[i].status == GOP_SLOT_COMPLETE && + cli->gop_jobs[i].gop_index == next_gop_to_write) { + gop_job_t *job = &cli->gop_jobs[i]; + pthread_mutex_unlock(&cli->job_mutex); + + // Write this GOP + if (job->success && job->packet) { + write_timecode_packet(cli->output_fp, job->frame_numbers[0], + cli->enc_params.fps_num, cli->enc_params.fps_den); + if (cli->has_audio && job->num_audio_samples > 0) { + write_audio_packet(cli->output_fp, cli, job->audio_samples, job->num_audio_samples); + } + write_tav_packet(cli->output_fp, job->packet); + cli->total_bytes += job->packet->size; + cli->gop_count++; + + if (job->packet->packet_type == TAV_PACKET_GOP_UNIFIED) { + write_gop_sync_packet(cli->output_fp, job->packet->data[1]); + } else if (job->packet->packet_type == TAV_PACKET_IFRAME) { + write_gop_sync_packet(cli->output_fp, 1); + } + + tav_encoder_free_packet(job->packet); + job->packet = NULL; + + // Progress + time_t elapsed = time(NULL) - cli->start_time; + double fps = elapsed > 0 ? (double)cli->frame_count / elapsed : 0.0; + printf("\rFrame %ld | GOPs: %ld | %.1f fps | %zu KB ", + cli->frame_count, cli->gop_count, fps, cli->total_bytes / 1024); + fflush(stdout); + } + + pthread_mutex_lock(&cli->job_mutex); + job->status = GOP_SLOT_EMPTY; + job->num_frames = 0; + next_gop_to_write++; + wrote_something = 1; + break; + } + } + if (!wrote_something) { + pthread_cond_wait(&cli->job_complete, &cli->job_mutex); + } + } + pthread_mutex_unlock(&cli->job_mutex); + + // Reset audio accumulator only when starting a fresh GOP + if (frames_in_current_gop == 0) { + slot->num_audio_samples = 0; + } + + // Read frame from FFmpeg + if (cli->encode_limit > 0 && cli->frame_count >= cli->encode_limit) { + eof_reached = 1; + } else { + int result = read_rgb_frame(cli->ffmpeg_pipe, rgb_frame, frame_size); + if (result == 0) { + eof_reached = 1; + } else if (result < 0) { + fprintf(stderr, "Error reading frame\n"); + encoding_error = 1; + } else { + // Copy frame to slot buffer + memcpy(slot->rgb_frames[frames_in_current_gop], rgb_frame, frame_size); + slot->frame_numbers[frames_in_current_gop] = (int)cli->frame_count; + frames_in_current_gop++; + cli->frame_count++; + + // Accumulate audio + if (cli->has_audio && cli->audio_buffer) { + size_t samples_read = read_audio_samples(cli, cli->audio_buffer, cli->samples_per_frame); + if (samples_read > 0) { + memcpy(slot->audio_samples + slot->num_audio_samples * 2, + cli->audio_buffer, + samples_read * 2 * sizeof(float)); + slot->num_audio_samples += samples_read; + } + } + + // Check if GOP is complete + if (frames_in_current_gop >= gop_size) { + slot->num_frames = frames_in_current_gop; + slot->gop_index = current_gop_index; + + // Submit GOP to worker threads + pthread_mutex_lock(&cli->job_mutex); + slot->status = GOP_SLOT_READY; + pthread_cond_broadcast(&cli->job_ready); + pthread_mutex_unlock(&cli->job_mutex); + + // Move to next slot + current_slot = (current_slot + 1) % cli->num_threads; + current_gop_index++; + frames_in_current_gop = 0; + + // Note: audio reset moved to after we confirm slot is empty + } + } + } + } + + // Handle partial GOP at end + if (!encoding_error && frames_in_current_gop > 0) { + printf("\nEncoding final partial GOP (%d frames)...\n", frames_in_current_gop); + + gop_job_t *slot = &cli->gop_jobs[current_slot]; + slot->num_frames = frames_in_current_gop; + slot->gop_index = current_gop_index; + + pthread_mutex_lock(&cli->job_mutex); + slot->status = GOP_SLOT_READY; + pthread_cond_broadcast(&cli->job_ready); + pthread_mutex_unlock(&cli->job_mutex); + + current_gop_index++; + } + + // Wait for all remaining GOPs to complete and write them + while (!encoding_error && next_gop_to_write < current_gop_index) { + pthread_mutex_lock(&cli->job_mutex); + + // Find slot with next GOP to write + int found = -1; + while (found < 0 && !encoding_error) { + for (int i = 0; i < cli->num_threads; i++) { + if (cli->gop_jobs[i].status == GOP_SLOT_COMPLETE && + cli->gop_jobs[i].gop_index == next_gop_to_write) { + found = i; + break; + } + } + if (found < 0) { + pthread_cond_wait(&cli->job_complete, &cli->job_mutex); + } + } + + if (found >= 0) { + gop_job_t *job = &cli->gop_jobs[found]; + pthread_mutex_unlock(&cli->job_mutex); + + if (job->success && job->packet) { + write_timecode_packet(cli->output_fp, job->frame_numbers[0], + cli->enc_params.fps_num, cli->enc_params.fps_den); + if (cli->has_audio && job->num_audio_samples > 0) { + write_audio_packet(cli->output_fp, cli, job->audio_samples, job->num_audio_samples); + } + write_tav_packet(cli->output_fp, job->packet); + cli->total_bytes += job->packet->size; + cli->gop_count++; + + if (job->packet->packet_type == TAV_PACKET_GOP_UNIFIED) { + write_gop_sync_packet(cli->output_fp, job->packet->data[1]); + } else if (job->packet->packet_type == TAV_PACKET_IFRAME) { + write_gop_sync_packet(cli->output_fp, 1); + } + + tav_encoder_free_packet(job->packet); + job->packet = NULL; + } + + pthread_mutex_lock(&cli->job_mutex); + job->status = GOP_SLOT_EMPTY; + next_gop_to_write++; + pthread_mutex_unlock(&cli->job_mutex); + } else { + pthread_mutex_unlock(&cli->job_mutex); + } + } + + printf("\n"); + + // Update total frames in header + update_total_frames(cli->output_fp, (uint32_t)cli->frame_count); + + // Free per-job frame buffers (must be done before shutdown_threading) + for (int slot = 0; slot < cli->num_threads; slot++) { + if (cli->gop_jobs[slot].rgb_frames) { + for (int f = 0; f < gop_size; f++) { + free(cli->gop_jobs[slot].rgb_frames[f]); + } + free(cli->gop_jobs[slot].rgb_frames); + cli->gop_jobs[slot].rgb_frames = NULL; + } + free(cli->gop_jobs[slot].frame_numbers); + cli->gop_jobs[slot].frame_numbers = NULL; + free(cli->gop_jobs[slot].audio_samples); + cli->gop_jobs[slot].audio_samples = NULL; + } + + // Cleanup + free(rgb_frame); + shutdown_threading(cli); + pclose(cli->ffmpeg_pipe); + + // Cleanup audio + if (cli->audio_buffer) { + free(cli->audio_buffer); + cli->audio_buffer = NULL; + } + if (cli->gop_audio_buffer) { + free(cli->gop_audio_buffer); + cli->gop_audio_buffer = NULL; + } + if (cli->pcm_file) { + fclose(cli->pcm_file); + cli->pcm_file = NULL; + } + if (cli->has_audio) { + unlink(TEMP_PCM_FILE); + } + + // Final statistics + time_t total_time = time(NULL) - cli->start_time; + double avg_fps = total_time > 0 ? (double)cli->frame_count / total_time : 0.0; + double duration = (double)cli->frame_count / ((double)cli->enc_params.fps_num / cli->enc_params.fps_den); + double avg_bitrate = duration > 0 ? (cli->total_bytes * 8.0) / duration / 1000.0 : 0.0; + + printf("\nEncoding complete! (multithreaded, %d threads)\n", cli->num_threads); + printf(" Frames encoded: %ld\n", cli->frame_count); + printf(" GOPs encoded: %ld\n", cli->gop_count); + printf(" Total size: %.2f MB\n", cli->total_bytes / (1024.0 * 1024.0)); + printf(" Duration: %.2f seconds\n", duration); + printf(" Average bitrate: %.1f kbps\n", avg_bitrate); + printf(" Encoding speed: %.1f fps\n", avg_fps); + printf(" Time taken: %ld seconds\n", total_time); + + return encoding_error ? -1 : 0; +} + +// ============================================================================= +// Single-Threaded Encoding Loop // ============================================================================= static int encode_video(cli_context_t *cli) { + // Dispatch to multithreaded version if threads > 0 + if (cli->num_threads > 0) { + return encode_video_mt(cli); + } + printf("Opening FFmpeg pipe...\n"); cli->ffmpeg_pipe = open_ffmpeg_pipe(cli->input_file, cli->enc_params.width, @@ -1313,12 +2014,13 @@ int main(int argc, char *argv[]) { {"preset-anime", no_argument, 0, 1027}, {"monoblock", no_argument, 0, 1028}, {"tiled", no_argument, 0, 1029}, + {"threads", required_argument, 0, 't'}, {"help", no_argument, 0, '?'}, {0, 0, 0, 0} }; int c, option_index = 0; - while ((c = getopt_long(argc, argv, "i:o:s:f:q:Q:w:c:v?", long_options, &option_index)) != -1) { + while ((c = getopt_long(argc, argv, "i:o:s:f:q:Q:w:c:t:v?", long_options, &option_index)) != -1) { switch (c) { case 'i': cli.input_file = strdup(optarg); @@ -1466,6 +2168,15 @@ int main(int argc, char *argv[]) { case 1029: // --tiled cli.enc_params.monoblock = 0; break; + case 't': { // --threads + int threads = atoi(optarg); + if (threads < 0 || threads > MAX_THREADS) { + fprintf(stderr, "Error: Thread count must be 0-%d (0=single-threaded)\n", MAX_THREADS); + return 1; + } + cli.num_threads = threads; + break; + } case '?': default: print_usage(argv[0]); @@ -1572,6 +2283,11 @@ int main(int argc, char *argv[]) { // Encode video int result = encode_video(&cli); + // Print output file before cleanup frees the string + if (result >= 0) { + printf("\nOutput written to: %s\n", cli.output_file); + } + // Cleanup fclose(cli.output_fp); free(cli.input_file); @@ -1594,6 +2310,5 @@ int main(int argc, char *argv[]) { return 1; } - printf("\nOutput written to: %s\n", cli.output_file); return 0; }