tavenc: multithreaded encoding

This commit is contained in:
minjaesong
2025-12-08 11:06:03 +09:00
parent 9b72a62cdb
commit 34a1f0e3db
2 changed files with 729 additions and 14 deletions

View File

@@ -392,26 +392,26 @@ tav_encoder_context_t *tav_encoder_create(const tav_encoder_params_t *params) {
// Auto mode: use monoblock for <= D1 PAL, tiled for larger
if (ctx->width > TAV_MONOBLOCK_MAX_WIDTH || ctx->height > TAV_MONOBLOCK_MAX_HEIGHT) {
ctx->monoblock = 0;
// if (ctx->verbose) {
if (ctx->verbose) {
printf("Auto-selected Padded Tiling mode: %dx%d exceeds D1 PAL threshold (%dx%d)\n",
ctx->width, ctx->height, TAV_MONOBLOCK_MAX_WIDTH, TAV_MONOBLOCK_MAX_HEIGHT);
// }
}
} else {
ctx->monoblock = 1;
// if (ctx->verbose) {
if (ctx->verbose) {
printf("Auto-selected Monoblock mode: %dx%d within D1 PAL threshold\n",
ctx->width, ctx->height);
// }
}
}
} else if (ctx->monoblock == 0) {
// if (ctx->verbose) {
if (ctx->verbose) {
printf("Forced Padded Tiling mode (--tiled)\n");
// }
}
} else {
// monoblock == 1: force monoblock even for large dimensions
// if (ctx->verbose) {
if (ctx->verbose) {
printf("Forced Monoblock mode (--monoblock)\n");
// }
}
}
// Calculate tile dimensions based on monoblock setting
@@ -423,10 +423,10 @@ tav_encoder_context_t *tav_encoder_create(const tav_encoder_params_t *params) {
// Padded Tiling mode: multiple tiles of TILE_SIZE_X × TILE_SIZE_Y
ctx->tiles_x = (ctx->width + TAV_TILE_SIZE_X - 1) / TAV_TILE_SIZE_X;
ctx->tiles_y = (ctx->height + TAV_TILE_SIZE_Y - 1) / TAV_TILE_SIZE_Y;
// if (ctx->verbose) {
if (ctx->verbose) {
printf("Padded Tiling mode: %dx%d tiles (%d total)\n",
ctx->tiles_x, ctx->tiles_y, ctx->tiles_x * ctx->tiles_y);
// }
}
}
// Calculate decomp levels if auto (0)

View File

@@ -25,10 +25,43 @@
#include <time.h>
#include <unistd.h>
#include <sys/stat.h>
#include <pthread.h>
#include "tav_encoder_lib.h"
#include "encoder_tad.h"
// =============================================================================
// Multithreading Structures
// =============================================================================
#define MAX_THREADS 16
#define GOP_SLOT_EMPTY 0
#define GOP_SLOT_READY 1
#define GOP_SLOT_ENCODING 2
#define GOP_SLOT_COMPLETE 3
typedef struct gop_job {
// Slot state
volatile int status;
// Input data (owned by job)
uint8_t **rgb_frames; // Array of frame pointers [num_frames]
int num_frames; // Frames in this GOP
int *frame_numbers; // Frame indices for timecodes
int gop_index; // Sequential GOP number
// Audio data (owned by job)
float *audio_samples; // Stereo PCM32f for this GOP
size_t num_audio_samples; // Samples per channel
// Output data (filled by worker, owned by job)
tav_encoder_packet_t *packet; // Encoded video packet
int success; // 1 if encoding succeeded
// Encoder params (copy for thread safety)
tav_encoder_params_t params;
} gop_job_t;
// =============================================================================
// Constants and Globals
// =============================================================================
@@ -119,6 +152,15 @@ typedef struct {
// Subtitle processing
subtitle_entry_t *subtitles;
// Multithreading
int num_threads; // 0 = single-threaded, 1+ = num worker threads
gop_job_t *gop_jobs; // Array of GOP job slots [num_threads]
pthread_t *worker_threads; // Array of worker thread handles [num_threads]
pthread_mutex_t job_mutex; // Mutex for job slot access
pthread_cond_t job_ready; // Signal when a job slot is ready for encoding
pthread_cond_t job_complete; // Signal when a job slot is complete
volatile int shutdown_workers; // 1 when workers should exit
} cli_context_t;
// =============================================================================
@@ -166,6 +208,9 @@ static void print_usage(const char *program) {
printf(" --intra-only Disable temporal compression (I-frames only)\n");
printf(" --gop-size N GOP size 8/16/24 (default: 24)\n");
printf(" --single-pass Disable scene change detection\n");
printf("\nPerformance:\n");
printf(" -t, --threads N Parallel encoding threads (0=single-threaded, default: 0)\n");
printf(" Each thread encodes one GOP independently\n");
// printf("\nTiling:\n");
// printf(" --monoblock Force single-tile mode (auto-disabled for > %dx%d)\n",
// TAV_MONOBLOCK_MAX_WIDTH, TAV_MONOBLOCK_MAX_HEIGHT);
@@ -200,7 +245,9 @@ static void print_usage(const char *program) {
printf(" # Sports mode with larger GOP\n");
printf(" %s -i video.mp4 -o out.tav --preset-sports --gop-size 24\n\n", program);
printf(" # Advanced: separate quality per channel\n");
printf(" %s -i video.mp4 -o out.tav --quality-y 5 --quality-co 4 --quality-cg 3\n", program);
printf(" %s -i video.mp4 -o out.tav --quality-y 5 --quality-co 4 --quality-cg 3\n\n", program);
printf(" # Multithreaded encoding with 4 threads\n");
printf(" %s -i video.mp4 -o out.tav -t 4 -q 3\n", program);
}
// =============================================================================
@@ -924,10 +971,664 @@ static int write_fontrom_packet(FILE *fp, const char *filename, uint8_t opcode,
}
// =============================================================================
// Main Encoding Loop
// Worker Thread Functions
// =============================================================================
/**
* Worker thread context - passed to worker_thread_main.
*/
typedef struct {
cli_context_t *cli;
int thread_id;
} worker_context_t;
/**
* Worker thread main function.
* Continuously picks up jobs from the job pool and encodes them.
*/
static void *worker_thread_main(void *arg) {
worker_context_t *wctx = (worker_context_t *)arg;
cli_context_t *cli = wctx->cli;
(void)wctx->thread_id; // Unused but kept for debugging
while (1) {
pthread_mutex_lock(&cli->job_mutex);
// Wait for a job or shutdown signal
while (!cli->shutdown_workers) {
// Look for a job slot that is ready to encode
int found_job = -1;
for (int i = 0; i < cli->num_threads; i++) {
if (cli->gop_jobs[i].status == GOP_SLOT_READY) {
cli->gop_jobs[i].status = GOP_SLOT_ENCODING;
found_job = i;
break;
}
}
if (found_job >= 0) {
pthread_mutex_unlock(&cli->job_mutex);
// Encode this GOP
gop_job_t *job = &cli->gop_jobs[found_job];
// Create thread-local encoder context
tav_encoder_context_t *ctx = tav_encoder_create(&job->params);
if (!ctx) {
fprintf(stderr, "Failed to create encoder for GOP %d\n", job->gop_index);
job->success = 0;
} else {
// Encode GOP
int result = tav_encoder_encode_gop(ctx,
(const uint8_t **)job->rgb_frames,
job->num_frames,
job->frame_numbers,
&job->packet);
job->success = (result == 1 && job->packet != NULL);
tav_encoder_free(ctx);
}
// Mark job as complete (reacquire lock for next iteration)
pthread_mutex_lock(&cli->job_mutex);
job->status = GOP_SLOT_COMPLETE;
pthread_cond_broadcast(&cli->job_complete);
// Keep lock held for next iteration of inner while loop
continue; // Look for more jobs
}
// No job found, wait for signal
pthread_cond_wait(&cli->job_ready, &cli->job_mutex);
}
pthread_mutex_unlock(&cli->job_mutex);
break; // Shutdown
}
free(wctx);
return NULL;
}
/**
* Initialize multithreading resources.
* Returns 0 on success, -1 on failure.
*/
static int init_threading(cli_context_t *cli) {
if (cli->num_threads <= 0) {
return 0; // Single-threaded mode
}
// Initialize mutex and condition variables
if (pthread_mutex_init(&cli->job_mutex, NULL) != 0) {
fprintf(stderr, "Error: Failed to initialize job mutex\n");
return -1;
}
if (pthread_cond_init(&cli->job_ready, NULL) != 0) {
fprintf(stderr, "Error: Failed to initialize job_ready cond\n");
pthread_mutex_destroy(&cli->job_mutex);
return -1;
}
if (pthread_cond_init(&cli->job_complete, NULL) != 0) {
fprintf(stderr, "Error: Failed to initialize job_complete cond\n");
pthread_cond_destroy(&cli->job_ready);
pthread_mutex_destroy(&cli->job_mutex);
return -1;
}
// Allocate job slots (one per thread)
cli->gop_jobs = calloc(cli->num_threads, sizeof(gop_job_t));
if (!cli->gop_jobs) {
fprintf(stderr, "Error: Failed to allocate job slots\n");
pthread_cond_destroy(&cli->job_complete);
pthread_cond_destroy(&cli->job_ready);
pthread_mutex_destroy(&cli->job_mutex);
return -1;
}
// Allocate worker thread handles
cli->worker_threads = malloc(cli->num_threads * sizeof(pthread_t));
if (!cli->worker_threads) {
fprintf(stderr, "Error: Failed to allocate thread handles\n");
free(cli->gop_jobs);
pthread_cond_destroy(&cli->job_complete);
pthread_cond_destroy(&cli->job_ready);
pthread_mutex_destroy(&cli->job_mutex);
return -1;
}
// Start worker threads
cli->shutdown_workers = 0;
for (int i = 0; i < cli->num_threads; i++) {
worker_context_t *wctx = malloc(sizeof(worker_context_t));
if (!wctx) {
fprintf(stderr, "Error: Failed to allocate worker context\n");
cli->shutdown_workers = 1;
pthread_cond_broadcast(&cli->job_ready);
for (int j = 0; j < i; j++) {
pthread_join(cli->worker_threads[j], NULL);
}
free(cli->worker_threads);
free(cli->gop_jobs);
pthread_cond_destroy(&cli->job_complete);
pthread_cond_destroy(&cli->job_ready);
pthread_mutex_destroy(&cli->job_mutex);
return -1;
}
wctx->cli = cli;
wctx->thread_id = i;
if (pthread_create(&cli->worker_threads[i], NULL, worker_thread_main, wctx) != 0) {
fprintf(stderr, "Error: Failed to create worker thread %d\n", i);
free(wctx);
cli->shutdown_workers = 1;
pthread_cond_broadcast(&cli->job_ready);
for (int j = 0; j < i; j++) {
pthread_join(cli->worker_threads[j], NULL);
}
free(cli->worker_threads);
free(cli->gop_jobs);
pthread_cond_destroy(&cli->job_complete);
pthread_cond_destroy(&cli->job_ready);
pthread_mutex_destroy(&cli->job_mutex);
return -1;
}
}
printf("Started %d worker threads for parallel GOP encoding\n", cli->num_threads);
return 0;
}
/**
* Shutdown multithreading resources.
*/
static void shutdown_threading(cli_context_t *cli) {
if (cli->num_threads <= 0) {
return;
}
// Signal workers to shutdown
pthread_mutex_lock(&cli->job_mutex);
cli->shutdown_workers = 1;
pthread_cond_broadcast(&cli->job_ready);
pthread_mutex_unlock(&cli->job_mutex);
// Wait for all workers to finish
for (int i = 0; i < cli->num_threads; i++) {
pthread_join(cli->worker_threads[i], NULL);
}
// Free job slots (and any remaining resources)
if (cli->gop_jobs) {
for (int i = 0; i < cli->num_threads; i++) {
if (cli->gop_jobs[i].packet) {
tav_encoder_free_packet(cli->gop_jobs[i].packet);
}
// Note: rgb_frames should already be freed by now
}
free(cli->gop_jobs);
cli->gop_jobs = NULL;
}
if (cli->worker_threads) {
free(cli->worker_threads);
cli->worker_threads = NULL;
}
pthread_cond_destroy(&cli->job_complete);
pthread_cond_destroy(&cli->job_ready);
pthread_mutex_destroy(&cli->job_mutex);
}
// =============================================================================
// Multithreaded Encoding Loop
// =============================================================================
/**
* Multithreaded video encoding function.
* Uses worker threads to encode GOPs in parallel.
*/
static int encode_video_mt(cli_context_t *cli) {
printf("Opening FFmpeg pipe...\n");
cli->ffmpeg_pipe = open_ffmpeg_pipe(cli->input_file,
cli->enc_params.width,
cli->enc_params.height);
if (!cli->ffmpeg_pipe) {
return -1;
}
// Create temporary encoder to get calculated params (decomp_levels, etc.)
printf("Creating encoder context...\n");
tav_encoder_context_t *ctx = tav_encoder_create(&cli->enc_params);
if (!ctx) {
fprintf(stderr, "Error: %s\n", "Failed to create encoder");
pclose(cli->ffmpeg_pipe);
return -1;
}
tav_encoder_get_params(ctx, &cli->enc_params);
tav_encoder_free(ctx);
ctx = NULL;
// Initialize threading
if (init_threading(cli) < 0) {
pclose(cli->ffmpeg_pipe);
return -1;
}
// Allocate per-job frame buffers
size_t frame_size = cli->enc_params.width * cli->enc_params.height * 3;
int gop_size = cli->enc_params.gop_size;
if (!cli->enc_params.enable_temporal_dwt) {
gop_size = 1;
}
// Allocate frame buffers for each job slot
for (int slot = 0; slot < cli->num_threads; slot++) {
cli->gop_jobs[slot].rgb_frames = malloc(gop_size * sizeof(uint8_t*));
cli->gop_jobs[slot].frame_numbers = malloc(gop_size * sizeof(int));
if (!cli->gop_jobs[slot].rgb_frames || !cli->gop_jobs[slot].frame_numbers) {
fprintf(stderr, "Error: Failed to allocate job slot %d buffers\n", slot);
shutdown_threading(cli);
pclose(cli->ffmpeg_pipe);
return -1;
}
for (int f = 0; f < gop_size; f++) {
cli->gop_jobs[slot].rgb_frames[f] = malloc(frame_size);
if (!cli->gop_jobs[slot].rgb_frames[f]) {
fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d\n", slot);
shutdown_threading(cli);
pclose(cli->ffmpeg_pipe);
return -1;
}
}
// Copy encoder params for thread safety
cli->gop_jobs[slot].params = cli->enc_params;
cli->gop_jobs[slot].status = GOP_SLOT_EMPTY;
cli->gop_jobs[slot].num_frames = 0;
}
// Allocate audio buffers if needed
if (cli->has_audio) {
size_t max_gop_audio = gop_size * cli->samples_per_frame * 2;
cli->gop_audio_buffer = malloc(max_gop_audio * sizeof(float));
cli->gop_audio_samples = 0;
if (!cli->gop_audio_buffer) {
fprintf(stderr, "Error: Failed to allocate GOP audio buffer\n");
shutdown_threading(cli);
pclose(cli->ffmpeg_pipe);
return -1;
}
// Allocate per-job audio buffers
for (int slot = 0; slot < cli->num_threads; slot++) {
cli->gop_jobs[slot].audio_samples = malloc(max_gop_audio * sizeof(float));
if (!cli->gop_jobs[slot].audio_samples) {
fprintf(stderr, "Error: Failed to allocate audio buffer for slot %d\n", slot);
shutdown_threading(cli);
pclose(cli->ffmpeg_pipe);
return -1;
}
}
}
// Temporary frame buffer for reading
uint8_t *rgb_frame = malloc(frame_size);
if (!rgb_frame) {
fprintf(stderr, "Error: Failed to allocate frame buffer\n");
shutdown_threading(cli);
pclose(cli->ffmpeg_pipe);
return -1;
}
// Write TAV header
write_tav_header(cli->output_fp, &cli->enc_params, cli->has_audio, cli->subtitles != NULL);
// Write subtitles upfront
if (cli->subtitles) {
printf("Writing subtitles...\n");
write_all_subtitles(cli->output_fp, cli->subtitles, cli->verbose);
}
// Write font ROMs if provided
if (cli->fontrom_low) {
printf("Uploading low font ROM...\n");
write_fontrom_packet(cli->output_fp, cli->fontrom_low, FONTROM_OPCODE_LOW, cli->verbose);
}
if (cli->fontrom_high) {
printf("Uploading high font ROM...\n");
write_fontrom_packet(cli->output_fp, cli->fontrom_high, FONTROM_OPCODE_HIGH, cli->verbose);
}
printf("Encoding frames with %d threads...\n", cli->num_threads);
cli->start_time = time(NULL);
int current_slot = 0; // Slot being filled
int next_gop_to_write = 0; // GOP index that should be written next
int current_gop_index = 0; // Current GOP index being assembled
int frames_in_current_gop = 0; // Frames accumulated in current slot
int encoding_error = 0;
int eof_reached = 0;
while (!encoding_error) {
// Step 1: Try to write any completed GOPs in order
pthread_mutex_lock(&cli->job_mutex);
while (!encoding_error) {
// Find the slot with the next GOP to write
int found = -1;
for (int i = 0; i < cli->num_threads; i++) {
if (cli->gop_jobs[i].status == GOP_SLOT_COMPLETE &&
cli->gop_jobs[i].gop_index == next_gop_to_write) {
found = i;
break;
}
}
if (found < 0) break; // No complete GOP ready to write
gop_job_t *job = &cli->gop_jobs[found];
pthread_mutex_unlock(&cli->job_mutex);
// Write this GOP
if (job->success && job->packet) {
// Write TIMECODE
write_timecode_packet(cli->output_fp, job->frame_numbers[0],
cli->enc_params.fps_num, cli->enc_params.fps_den);
// Write AUDIO for this GOP
if (cli->has_audio && job->num_audio_samples > 0) {
write_audio_packet(cli->output_fp, cli, job->audio_samples, job->num_audio_samples);
}
// Write VIDEO packet
write_tav_packet(cli->output_fp, job->packet);
cli->total_bytes += job->packet->size;
cli->gop_count++;
// Write GOP_SYNC
if (job->packet->packet_type == TAV_PACKET_GOP_UNIFIED) {
int frames_in_gop = job->packet->data[1];
write_gop_sync_packet(cli->output_fp, frames_in_gop);
} else if (job->packet->packet_type == TAV_PACKET_IFRAME) {
write_gop_sync_packet(cli->output_fp, 1);
}
tav_encoder_free_packet(job->packet);
job->packet = NULL;
} else {
fprintf(stderr, "Error: GOP %d encoding failed\n", job->gop_index);
encoding_error = 1;
}
// Mark slot as empty
pthread_mutex_lock(&cli->job_mutex);
job->status = GOP_SLOT_EMPTY;
job->num_frames = 0;
next_gop_to_write++;
// Progress
if (cli->verbose || cli->frame_count % 60 == 0) {
time_t elapsed = time(NULL) - cli->start_time;
double fps = elapsed > 0 ? (double)cli->frame_count / elapsed : 0.0;
double bitrate = elapsed > 0 ?
(cli->total_bytes * 8.0) / (cli->frame_count / ((double)cli->enc_params.fps_num / cli->enc_params.fps_den)) / 1000.0 : 0.0;
printf("\rFrame %ld | GOPs: %ld | %.1f fps | %.1f kbps | %zu KB ",
cli->frame_count, cli->gop_count, fps, bitrate,
cli->total_bytes / 1024);
fflush(stdout);
}
}
pthread_mutex_unlock(&cli->job_mutex);
if (encoding_error || eof_reached) break;
// Step 2: Fill current GOP slot
gop_job_t *slot = &cli->gop_jobs[current_slot];
// Wait for slot to be empty (writing completed GOPs along the way)
pthread_mutex_lock(&cli->job_mutex);
while (slot->status != GOP_SLOT_EMPTY && !cli->shutdown_workers) {
// While waiting, check if we can write any completed GOPs
int wrote_something = 0;
for (int i = 0; i < cli->num_threads; i++) {
if (cli->gop_jobs[i].status == GOP_SLOT_COMPLETE &&
cli->gop_jobs[i].gop_index == next_gop_to_write) {
gop_job_t *job = &cli->gop_jobs[i];
pthread_mutex_unlock(&cli->job_mutex);
// Write this GOP
if (job->success && job->packet) {
write_timecode_packet(cli->output_fp, job->frame_numbers[0],
cli->enc_params.fps_num, cli->enc_params.fps_den);
if (cli->has_audio && job->num_audio_samples > 0) {
write_audio_packet(cli->output_fp, cli, job->audio_samples, job->num_audio_samples);
}
write_tav_packet(cli->output_fp, job->packet);
cli->total_bytes += job->packet->size;
cli->gop_count++;
if (job->packet->packet_type == TAV_PACKET_GOP_UNIFIED) {
write_gop_sync_packet(cli->output_fp, job->packet->data[1]);
} else if (job->packet->packet_type == TAV_PACKET_IFRAME) {
write_gop_sync_packet(cli->output_fp, 1);
}
tav_encoder_free_packet(job->packet);
job->packet = NULL;
// Progress
time_t elapsed = time(NULL) - cli->start_time;
double fps = elapsed > 0 ? (double)cli->frame_count / elapsed : 0.0;
printf("\rFrame %ld | GOPs: %ld | %.1f fps | %zu KB ",
cli->frame_count, cli->gop_count, fps, cli->total_bytes / 1024);
fflush(stdout);
}
pthread_mutex_lock(&cli->job_mutex);
job->status = GOP_SLOT_EMPTY;
job->num_frames = 0;
next_gop_to_write++;
wrote_something = 1;
break;
}
}
if (!wrote_something) {
pthread_cond_wait(&cli->job_complete, &cli->job_mutex);
}
}
pthread_mutex_unlock(&cli->job_mutex);
// Reset audio accumulator only when starting a fresh GOP
if (frames_in_current_gop == 0) {
slot->num_audio_samples = 0;
}
// Read frame from FFmpeg
if (cli->encode_limit > 0 && cli->frame_count >= cli->encode_limit) {
eof_reached = 1;
} else {
int result = read_rgb_frame(cli->ffmpeg_pipe, rgb_frame, frame_size);
if (result == 0) {
eof_reached = 1;
} else if (result < 0) {
fprintf(stderr, "Error reading frame\n");
encoding_error = 1;
} else {
// Copy frame to slot buffer
memcpy(slot->rgb_frames[frames_in_current_gop], rgb_frame, frame_size);
slot->frame_numbers[frames_in_current_gop] = (int)cli->frame_count;
frames_in_current_gop++;
cli->frame_count++;
// Accumulate audio
if (cli->has_audio && cli->audio_buffer) {
size_t samples_read = read_audio_samples(cli, cli->audio_buffer, cli->samples_per_frame);
if (samples_read > 0) {
memcpy(slot->audio_samples + slot->num_audio_samples * 2,
cli->audio_buffer,
samples_read * 2 * sizeof(float));
slot->num_audio_samples += samples_read;
}
}
// Check if GOP is complete
if (frames_in_current_gop >= gop_size) {
slot->num_frames = frames_in_current_gop;
slot->gop_index = current_gop_index;
// Submit GOP to worker threads
pthread_mutex_lock(&cli->job_mutex);
slot->status = GOP_SLOT_READY;
pthread_cond_broadcast(&cli->job_ready);
pthread_mutex_unlock(&cli->job_mutex);
// Move to next slot
current_slot = (current_slot + 1) % cli->num_threads;
current_gop_index++;
frames_in_current_gop = 0;
// Note: audio reset moved to after we confirm slot is empty
}
}
}
}
// Handle partial GOP at end
if (!encoding_error && frames_in_current_gop > 0) {
printf("\nEncoding final partial GOP (%d frames)...\n", frames_in_current_gop);
gop_job_t *slot = &cli->gop_jobs[current_slot];
slot->num_frames = frames_in_current_gop;
slot->gop_index = current_gop_index;
pthread_mutex_lock(&cli->job_mutex);
slot->status = GOP_SLOT_READY;
pthread_cond_broadcast(&cli->job_ready);
pthread_mutex_unlock(&cli->job_mutex);
current_gop_index++;
}
// Wait for all remaining GOPs to complete and write them
while (!encoding_error && next_gop_to_write < current_gop_index) {
pthread_mutex_lock(&cli->job_mutex);
// Find slot with next GOP to write
int found = -1;
while (found < 0 && !encoding_error) {
for (int i = 0; i < cli->num_threads; i++) {
if (cli->gop_jobs[i].status == GOP_SLOT_COMPLETE &&
cli->gop_jobs[i].gop_index == next_gop_to_write) {
found = i;
break;
}
}
if (found < 0) {
pthread_cond_wait(&cli->job_complete, &cli->job_mutex);
}
}
if (found >= 0) {
gop_job_t *job = &cli->gop_jobs[found];
pthread_mutex_unlock(&cli->job_mutex);
if (job->success && job->packet) {
write_timecode_packet(cli->output_fp, job->frame_numbers[0],
cli->enc_params.fps_num, cli->enc_params.fps_den);
if (cli->has_audio && job->num_audio_samples > 0) {
write_audio_packet(cli->output_fp, cli, job->audio_samples, job->num_audio_samples);
}
write_tav_packet(cli->output_fp, job->packet);
cli->total_bytes += job->packet->size;
cli->gop_count++;
if (job->packet->packet_type == TAV_PACKET_GOP_UNIFIED) {
write_gop_sync_packet(cli->output_fp, job->packet->data[1]);
} else if (job->packet->packet_type == TAV_PACKET_IFRAME) {
write_gop_sync_packet(cli->output_fp, 1);
}
tav_encoder_free_packet(job->packet);
job->packet = NULL;
}
pthread_mutex_lock(&cli->job_mutex);
job->status = GOP_SLOT_EMPTY;
next_gop_to_write++;
pthread_mutex_unlock(&cli->job_mutex);
} else {
pthread_mutex_unlock(&cli->job_mutex);
}
}
printf("\n");
// Update total frames in header
update_total_frames(cli->output_fp, (uint32_t)cli->frame_count);
// Free per-job frame buffers (must be done before shutdown_threading)
for (int slot = 0; slot < cli->num_threads; slot++) {
if (cli->gop_jobs[slot].rgb_frames) {
for (int f = 0; f < gop_size; f++) {
free(cli->gop_jobs[slot].rgb_frames[f]);
}
free(cli->gop_jobs[slot].rgb_frames);
cli->gop_jobs[slot].rgb_frames = NULL;
}
free(cli->gop_jobs[slot].frame_numbers);
cli->gop_jobs[slot].frame_numbers = NULL;
free(cli->gop_jobs[slot].audio_samples);
cli->gop_jobs[slot].audio_samples = NULL;
}
// Cleanup
free(rgb_frame);
shutdown_threading(cli);
pclose(cli->ffmpeg_pipe);
// Cleanup audio
if (cli->audio_buffer) {
free(cli->audio_buffer);
cli->audio_buffer = NULL;
}
if (cli->gop_audio_buffer) {
free(cli->gop_audio_buffer);
cli->gop_audio_buffer = NULL;
}
if (cli->pcm_file) {
fclose(cli->pcm_file);
cli->pcm_file = NULL;
}
if (cli->has_audio) {
unlink(TEMP_PCM_FILE);
}
// Final statistics
time_t total_time = time(NULL) - cli->start_time;
double avg_fps = total_time > 0 ? (double)cli->frame_count / total_time : 0.0;
double duration = (double)cli->frame_count / ((double)cli->enc_params.fps_num / cli->enc_params.fps_den);
double avg_bitrate = duration > 0 ? (cli->total_bytes * 8.0) / duration / 1000.0 : 0.0;
printf("\nEncoding complete! (multithreaded, %d threads)\n", cli->num_threads);
printf(" Frames encoded: %ld\n", cli->frame_count);
printf(" GOPs encoded: %ld\n", cli->gop_count);
printf(" Total size: %.2f MB\n", cli->total_bytes / (1024.0 * 1024.0));
printf(" Duration: %.2f seconds\n", duration);
printf(" Average bitrate: %.1f kbps\n", avg_bitrate);
printf(" Encoding speed: %.1f fps\n", avg_fps);
printf(" Time taken: %ld seconds\n", total_time);
return encoding_error ? -1 : 0;
}
// =============================================================================
// Single-Threaded Encoding Loop
// =============================================================================
static int encode_video(cli_context_t *cli) {
// Dispatch to multithreaded version if threads > 0
if (cli->num_threads > 0) {
return encode_video_mt(cli);
}
printf("Opening FFmpeg pipe...\n");
cli->ffmpeg_pipe = open_ffmpeg_pipe(cli->input_file,
cli->enc_params.width,
@@ -1313,12 +2014,13 @@ int main(int argc, char *argv[]) {
{"preset-anime", no_argument, 0, 1027},
{"monoblock", no_argument, 0, 1028},
{"tiled", no_argument, 0, 1029},
{"threads", required_argument, 0, 't'},
{"help", no_argument, 0, '?'},
{0, 0, 0, 0}
};
int c, option_index = 0;
while ((c = getopt_long(argc, argv, "i:o:s:f:q:Q:w:c:v?", long_options, &option_index)) != -1) {
while ((c = getopt_long(argc, argv, "i:o:s:f:q:Q:w:c:t:v?", long_options, &option_index)) != -1) {
switch (c) {
case 'i':
cli.input_file = strdup(optarg);
@@ -1466,6 +2168,15 @@ int main(int argc, char *argv[]) {
case 1029: // --tiled
cli.enc_params.monoblock = 0;
break;
case 't': { // --threads
int threads = atoi(optarg);
if (threads < 0 || threads > MAX_THREADS) {
fprintf(stderr, "Error: Thread count must be 0-%d (0=single-threaded)\n", MAX_THREADS);
return 1;
}
cli.num_threads = threads;
break;
}
case '?':
default:
print_usage(argv[0]);
@@ -1572,6 +2283,11 @@ int main(int argc, char *argv[]) {
// Encode video
int result = encode_video(&cli);
// Print output file before cleanup frees the string
if (result >= 0) {
printf("\nOutput written to: %s\n", cli.output_file);
}
// Cleanup
fclose(cli.output_fp);
free(cli.input_file);
@@ -1594,6 +2310,5 @@ int main(int argc, char *argv[]) {
return 1;
}
printf("\nOutput written to: %s\n", cli.output_file);
return 0;
}