tavdec: multithreaded audio decode

This commit is contained in:
minjaesong
2025-12-09 10:44:32 +09:00
parent 017aef26ab
commit 50092aef60
3 changed files with 374 additions and 21 deletions

View File

@@ -454,14 +454,13 @@ tav_encoder_context_t *tav_encoder_create(const tav_encoder_params_t *params) {
} }
// Auto-select temporal wavelet if still at default (255=Haar) and temporal DWT enabled // Auto-select temporal wavelet if still at default (255=Haar) and temporal DWT enabled
// Logic from old encoder: use Haar for large videos, CDF 5/3 for small/low-quality videos
if (ctx->enable_temporal_dwt && ctx->temporal_wavelet == 255) { if (ctx->enable_temporal_dwt && ctx->temporal_wavelet == 255) {
int num_pixels = ctx->width * ctx->height; int num_pixels = ctx->width * ctx->height;
int use_pure_haar = 0; int use_pure_haar = 0;
// Smart preset based on resolution and quality // Smart preset based on resolution and quality
// For large videos with reasonable quality, use Haar (better compression) // For large videos with reasonable quality, use Haar (better compression)
// For smaller videos or low quality, use CDF 5/3 (better detail preservation) // For smaller videos or low quality, use Haar with sports mode (better motion preservation)
if ((num_pixels >= 820000 && ctx->quantiser_y <= 29) || if ((num_pixels >= 820000 && ctx->quantiser_y <= 29) ||
(num_pixels >= 500000 && ctx->quantiser_y <= 14) || (num_pixels >= 500000 && ctx->quantiser_y <= 14) ||
(num_pixels >= 340000 && ctx->quantiser_y <= 7) || (num_pixels >= 340000 && ctx->quantiser_y <= 7) ||

View File

@@ -81,6 +81,22 @@ typedef struct {
} gop_decode_job_t; } gop_decode_job_t;
// =============================================================================
// Audio Decode Job Structure (for multithreading)
// =============================================================================
typedef struct {
long file_offset; // File position for reading
uint32_t payload_size; // Size of compressed audio data
uint16_t sample_count; // Expected sample count
uint8_t packet_type; // TAD_PACKET_AUDIO_TAD or TAD_PACKET_AUDIO_PCM8
// Output (decoded PCM data)
uint8_t *decoded_pcm; // Stereo PCMu8 output
size_t decoded_samples; // Actual samples decoded
volatile int status; // DECODE_SLOT_*
} audio_decode_job_t;
// ============================================================================= // =============================================================================
// TAV Header Structure (32 bytes) // TAV Header Structure (32 bytes)
// ============================================================================= // =============================================================================
@@ -151,7 +167,7 @@ typedef struct {
int no_audio; // Skip audio decoding int no_audio; // Skip audio decoding
int dump_packets; // Debug: dump packet info int dump_packets; // Debug: dump packet info
// Threading support // Threading support (video decoding)
int num_threads; int num_threads;
int num_slots; int num_slots;
gop_decode_job_t *slots; gop_decode_job_t *slots;
@@ -166,6 +182,18 @@ typedef struct {
volatile int jobs_submitted; volatile int jobs_submitted;
volatile int jobs_completed; volatile int jobs_completed;
// Audio decoding (pass 1 multithreading)
audio_decode_job_t *audio_jobs;
int audio_job_count;
int audio_job_capacity;
pthread_t *audio_worker_threads;
int num_audio_threads;
pthread_mutex_t audio_mutex;
pthread_cond_t audio_cond_job_available;
volatile int audio_threads_should_exit;
volatile int next_audio_job; // Next job for worker threads to process
volatile int next_audio_write; // Next job to write to temp file
} decoder_context_t; } decoder_context_t;
// ============================================================================= // =============================================================================
@@ -289,9 +317,8 @@ static int spawn_ffmpeg(decoder_context_t *ctx) {
if (ctx->interlaced) { if (ctx->interlaced) {
// Interlaced mode: merge separate fields into interlaced frames // Interlaced mode: merge separate fields into interlaced frames
// tinterlace=interleave_top combines consecutive fields into interlaced frames // setfield=tff marks each frame as top-field, weave combines consecutive fields
// Output will be full height (header.height) at half framerate // into full-height interlaced frames at half framerate
// Field order is set to top-field-first to match encoder
if (ctx->output_raw) { if (ctx->output_raw) {
// Raw video output (no compression) // Raw video output (no compression)
execl("/usr/bin/ffmpeg", "ffmpeg", execl("/usr/bin/ffmpeg", "ffmpeg",
@@ -304,7 +331,7 @@ static int spawn_ffmpeg(decoder_context_t *ctx) {
"-ar", "32000", "-ar", "32000",
"-ac", "2", "-ac", "2",
"-i", ctx->audio_temp_file, "-i", ctx->audio_temp_file,
"-vf", "tinterlace=interleave_top", "-vf", "setfield=tff,weave",
"-field_order", "tt", "-field_order", "tt",
"-c:v", "rawvideo", "-c:v", "rawvideo",
"-pixel_format", "rgb24", "-pixel_format", "rgb24",
@@ -326,7 +353,7 @@ static int spawn_ffmpeg(decoder_context_t *ctx) {
"-ar", "32000", "-ar", "32000",
"-ac", "2", "-ac", "2",
"-i", ctx->audio_temp_file, "-i", ctx->audio_temp_file,
"-vf", "tinterlace=interleave_top", "-vf", "setfield=tff,weave",
"-field_order", "tt", "-field_order", "tt",
"-color_range", "2", "-color_range", "2",
"-c:v", "ffv1", "-c:v", "ffv1",
@@ -1352,6 +1379,292 @@ static int decode_video_pass2_mt(decoder_context_t *ctx) {
return 0; return 0;
} }
// =============================================================================
// Multithreaded Audio Extraction (Pass 1)
// =============================================================================
// Audio worker thread - decodes audio packets in parallel
static void *audio_worker_thread(void *arg) {
decoder_context_t *ctx = (decoder_context_t*)arg;
FILE *input_fp = fopen(ctx->input_file, "rb");
if (!input_fp) {
return NULL;
}
while (1) {
pthread_mutex_lock(&ctx->audio_mutex);
// Wait for job or exit signal
while (ctx->next_audio_job >= ctx->audio_job_count && !ctx->audio_threads_should_exit) {
pthread_cond_wait(&ctx->audio_cond_job_available, &ctx->audio_mutex);
}
if (ctx->audio_threads_should_exit) {
pthread_mutex_unlock(&ctx->audio_mutex);
break;
}
// Get next job
int job_idx = ctx->next_audio_job++;
pthread_mutex_unlock(&ctx->audio_mutex);
if (job_idx >= ctx->audio_job_count) break;
audio_decode_job_t *job = &ctx->audio_jobs[job_idx];
job->status = DECODE_SLOT_PROCESSING;
// Seek to packet location
fseek(input_fp, job->file_offset, SEEK_SET);
if (job->packet_type == TAV_PACKET_AUDIO_TAD) {
// Read TAD packet data
uint8_t *tad_data = malloc(job->payload_size);
if (tad_data && fread(tad_data, 1, job->payload_size, input_fp) == job->payload_size) {
// Allocate output buffer
job->decoded_pcm = malloc(job->sample_count * 2);
if (job->decoded_pcm) {
size_t bytes_consumed = 0;
int result = tad32_decode_chunk(tad_data, job->payload_size,
job->decoded_pcm, &bytes_consumed,
&job->decoded_samples);
if (result != 0) {
free(job->decoded_pcm);
job->decoded_pcm = NULL;
job->decoded_samples = 0;
}
}
free(tad_data);
}
} else if (job->packet_type == TAV_PACKET_AUDIO_PCM8) {
// Read PCM8 data directly
job->decoded_pcm = malloc(job->payload_size);
if (job->decoded_pcm && fread(job->decoded_pcm, 1, job->payload_size, input_fp) == job->payload_size) {
job->decoded_samples = job->payload_size / 2; // Stereo
} else {
free(job->decoded_pcm);
job->decoded_pcm = NULL;
job->decoded_samples = 0;
}
}
job->status = DECODE_SLOT_DONE;
}
fclose(input_fp);
return NULL;
}
// Scan file and collect all audio packet metadata
static int collect_audio_packets(decoder_context_t *ctx) {
long current_pos = ftell(ctx->input_fp);
ctx->audio_job_capacity = 1024;
ctx->audio_jobs = malloc(ctx->audio_job_capacity * sizeof(audio_decode_job_t));
if (!ctx->audio_jobs) return -1;
ctx->audio_job_count = 0;
// Scan through file
while (1) {
long packet_pos = ftell(ctx->input_fp);
uint8_t packet_type;
if (fread(&packet_type, 1, 1, ctx->input_fp) != 1) break;
if (packet_type == TAV_PACKET_AUDIO_TAD) {
// TAD packet: [sample_count(2)][payload_size+7(4)][payload...]
uint16_t sample_count;
uint32_t payload_size_plus_7;
if (fread(&sample_count, 2, 1, ctx->input_fp) != 1) break;
if (fread(&payload_size_plus_7, 4, 1, ctx->input_fp) != 1) break;
// Grow array if needed
if (ctx->audio_job_count >= ctx->audio_job_capacity) {
ctx->audio_job_capacity *= 2;
ctx->audio_jobs = realloc(ctx->audio_jobs,
ctx->audio_job_capacity * sizeof(audio_decode_job_t));
if (!ctx->audio_jobs) return -1;
}
// Add job
audio_decode_job_t *job = &ctx->audio_jobs[ctx->audio_job_count++];
job->file_offset = ftell(ctx->input_fp);
job->payload_size = payload_size_plus_7;
job->sample_count = sample_count;
job->packet_type = TAV_PACKET_AUDIO_TAD;
job->decoded_pcm = NULL;
job->decoded_samples = 0;
job->status = DECODE_SLOT_PENDING;
fseek(ctx->input_fp, payload_size_plus_7, SEEK_CUR);
} else if (packet_type == TAV_PACKET_AUDIO_PCM8) {
// PCM8 packet: [size(4)][pcm_data]
uint32_t pcm_size;
if (fread(&pcm_size, 4, 1, ctx->input_fp) != 1) break;
// Grow array if needed
if (ctx->audio_job_count >= ctx->audio_job_capacity) {
ctx->audio_job_capacity *= 2;
ctx->audio_jobs = realloc(ctx->audio_jobs,
ctx->audio_job_capacity * sizeof(audio_decode_job_t));
if (!ctx->audio_jobs) return -1;
}
// Add job
audio_decode_job_t *job = &ctx->audio_jobs[ctx->audio_job_count++];
job->file_offset = ftell(ctx->input_fp);
job->payload_size = pcm_size;
job->sample_count = pcm_size / 2;
job->packet_type = TAV_PACKET_AUDIO_PCM8;
job->decoded_pcm = NULL;
job->decoded_samples = 0;
job->status = DECODE_SLOT_PENDING;
fseek(ctx->input_fp, pcm_size, SEEK_CUR);
} else {
// Skip other packet types
if (packet_type == TAV_PACKET_GOP_UNIFIED) {
uint8_t gop_size;
uint32_t compressed_size;
if (fread(&gop_size, 1, 1, ctx->input_fp) != 1) break;
if (fread(&compressed_size, 4, 1, ctx->input_fp) != 1) break;
fseek(ctx->input_fp, compressed_size, SEEK_CUR);
} else if (packet_type == TAV_PACKET_IFRAME) {
uint32_t compressed_size;
if (fread(&compressed_size, 4, 1, ctx->input_fp) != 1) break;
fseek(ctx->input_fp, compressed_size, SEEK_CUR);
} else if (packet_type == TAV_PACKET_EXTENDED_HDR) {
uint16_t num_pairs;
if (fread(&num_pairs, 2, 1, ctx->input_fp) != 1) break;
for (int i = 0; i < num_pairs; i++) {
uint8_t kv_header[5];
if (fread(kv_header, 1, 5, ctx->input_fp) != 5) break;
uint8_t value_type = kv_header[4];
if (value_type == 0x04) {
fseek(ctx->input_fp, 8, SEEK_CUR);
} else if (value_type == 0x10) {
uint16_t length;
if (fread(&length, 2, 1, ctx->input_fp) != 1) break;
fseek(ctx->input_fp, length, SEEK_CUR);
} else if (value_type <= 0x04) {
int sizes[] = {2, 3, 4, 6, 8};
fseek(ctx->input_fp, sizes[value_type], SEEK_CUR);
}
}
} else if (packet_type == TAV_PACKET_SCREEN_MASK) {
fseek(ctx->input_fp, 4, SEEK_CUR);
} else if (packet_type == TAV_PACKET_GOP_SYNC) {
fseek(ctx->input_fp, 1, SEEK_CUR);
} else if (packet_type == TAV_PACKET_TIMECODE) {
fseek(ctx->input_fp, 8, SEEK_CUR);
} else if (packet_type == TAV_PACKET_SYNC_NTSC ||
packet_type == TAV_PACKET_SYNC) {
// No payload
} else {
// Unknown packet - try to skip by reading size
uint32_t size;
if (fread(&size, 4, 1, ctx->input_fp) != 1) break;
if (size < 1000000) {
fseek(ctx->input_fp, size, SEEK_CUR);
} else {
break; // Likely corrupt
}
}
}
}
// Restore file position
fseek(ctx->input_fp, current_pos, SEEK_SET);
return 0;
}
// Extract audio using multiple threads
static int extract_audio_mt(decoder_context_t *ctx) {
// Collect all audio packet metadata
if (collect_audio_packets(ctx) < 0) {
fprintf(stderr, "Error: Failed to collect audio packets\n");
return -1;
}
if (ctx->audio_job_count == 0) {
// No audio packets found
return 0;
}
if (ctx->verbose) {
printf(" Found %d audio packets\n", ctx->audio_job_count);
}
// Initialize audio threading
ctx->num_audio_threads = ctx->num_threads > 0 ? ctx->num_threads : 1;
ctx->next_audio_job = 0;
ctx->next_audio_write = 0;
ctx->audio_threads_should_exit = 0;
pthread_mutex_init(&ctx->audio_mutex, NULL);
pthread_cond_init(&ctx->audio_cond_job_available, NULL);
// Create worker threads
ctx->audio_worker_threads = malloc(ctx->num_audio_threads * sizeof(pthread_t));
if (!ctx->audio_worker_threads) return -1;
for (int i = 0; i < ctx->num_audio_threads; i++) {
if (pthread_create(&ctx->audio_worker_threads[i], NULL,
audio_worker_thread, ctx) != 0) {
fprintf(stderr, "Error: Failed to create audio worker thread %d\n", i);
return -1;
}
}
// Signal all jobs available
pthread_mutex_lock(&ctx->audio_mutex);
pthread_cond_broadcast(&ctx->audio_cond_job_available);
pthread_mutex_unlock(&ctx->audio_mutex);
// Write decoded audio in order
for (int i = 0; i < ctx->audio_job_count; i++) {
audio_decode_job_t *job = &ctx->audio_jobs[i];
// Wait for this job to complete
while (job->status != DECODE_SLOT_DONE) {
usleep(100);
}
// Write to temp file
if (job->decoded_pcm && job->decoded_samples > 0 && ctx->audio_temp_fp) {
fwrite(job->decoded_pcm, 1, job->decoded_samples * 2, ctx->audio_temp_fp);
ctx->audio_samples_decoded += job->decoded_samples;
}
}
// Signal threads to exit
pthread_mutex_lock(&ctx->audio_mutex);
ctx->audio_threads_should_exit = 1;
pthread_cond_broadcast(&ctx->audio_cond_job_available);
pthread_mutex_unlock(&ctx->audio_mutex);
// Wait for threads to finish
for (int i = 0; i < ctx->num_audio_threads; i++) {
pthread_join(ctx->audio_worker_threads[i], NULL);
}
// Cleanup
for (int i = 0; i < ctx->audio_job_count; i++) {
if (ctx->audio_jobs[i].decoded_pcm) {
free(ctx->audio_jobs[i].decoded_pcm);
}
}
free(ctx->audio_jobs);
free(ctx->audio_worker_threads);
pthread_mutex_destroy(&ctx->audio_mutex);
pthread_cond_destroy(&ctx->audio_cond_job_available);
return 0;
}
// ============================================================================= // =============================================================================
// Main Decoding Loop // Main Decoding Loop
// ============================================================================= // =============================================================================
@@ -1368,11 +1681,21 @@ static int decode_video(decoder_context_t *ctx) {
// Pass 1: Audio extraction // Pass 1: Audio extraction
if (!ctx->no_audio) { if (!ctx->no_audio) {
printf("Pass 1: Extracting audio...\n"); printf("Pass 1: Extracting audio");
while (process_packet(ctx) == 0) { if (ctx->num_threads > 0) {
// Check decode limit printf(" (%d threads)...\n", ctx->num_threads);
if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { if (extract_audio_mt(ctx) < 0) {
break; fprintf(stderr, "Error: Multithreaded audio extraction failed\n");
return -1;
}
} else {
printf("...\n");
// Fallback to single-threaded
while (process_packet(ctx) == 0) {
// Check decode limit
if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) {
break;
}
} }
} }
@@ -1464,6 +1787,28 @@ static char *generate_output_filename(const char *input_file) {
return output; return output;
} }
/**
* Get number of available CPU cores.
* Returns the number of online processors, or 1 on error.
*/
static int get_available_cpus(void) {
#ifdef _SC_NPROCESSORS_ONLN
long nproc = sysconf(_SC_NPROCESSORS_ONLN);
if (nproc > 0) {
return (int)nproc;
}
#endif
return 1; // Fallback to single core
}
/**
* Get default thread count (cap at 8)
*/
static int get_default_thread_count(void) {
int available = get_available_cpus();
return available < 8 ? available : 8;
}
static void print_usage(const char *program) { static void print_usage(const char *program) {
printf("TAV Decoder - TSVM Advanced Video Codec (Reference Implementation)\n"); printf("TAV Decoder - TSVM Advanced Video Codec (Reference Implementation)\n");
printf("\nUsage: %s -i input.tav [-o output.mkv] [options]\n\n", program); printf("\nUsage: %s -i input.tav [-o output.mkv] [options]\n\n", program);
@@ -1490,6 +1835,9 @@ int main(int argc, char *argv[]) {
decoder_context_t ctx = {0}; decoder_context_t ctx = {0};
// Initialize threading
ctx.num_threads = get_default_thread_count();
// Command-line options // Command-line options
static struct option long_options[] = { static struct option long_options[] = {
{"input", required_argument, 0, 'i'}, {"input", required_argument, 0, 'i'},
@@ -1516,9 +1864,16 @@ int main(int argc, char *argv[]) {
case 'v': case 'v':
ctx.verbose = 1; ctx.verbose = 1;
break; break;
case 't': case 't': { // --threads
ctx.num_threads = atoi(optarg); int threads = atoi(optarg);
if (threads < 0) {
fprintf(stderr, "Error: Thread count must be positive\n");
return 1;
}
// Both 0 and 1 mean single-threaded (use value 0 internally)
ctx.num_threads = (threads <= 1) ? 0 : threads;
break; break;
}
case 1001: case 1001:
ctx.output_raw = 1; ctx.output_raw = 1;
break; break;
@@ -1535,7 +1890,7 @@ int main(int argc, char *argv[]) {
case '?': case '?':
default: default:
print_usage(argv[0]); print_usage(argv[0]);
return (c == 'h') ? 0 : 1; return (c == 'h' || c == '?') ? 0 : 1;
} }
} }

View File

@@ -34,7 +34,6 @@
// Multithreading Structures // Multithreading Structures
// ============================================================================= // =============================================================================
#define MAX_THREADS 16
#define GOP_SLOT_EMPTY 0 #define GOP_SLOT_EMPTY 0
#define GOP_SLOT_READY 1 #define GOP_SLOT_READY 1
#define GOP_SLOT_ENCODING 2 #define GOP_SLOT_ENCODING 2
@@ -263,7 +262,7 @@ static int get_available_cpus(void) {
} }
/** /**
* Get default thread count: min(8, available_cpus) * Get default thread count (cap at 8)
*/ */
static int get_default_thread_count(void) { static int get_default_thread_count(void) {
int available = get_available_cpus(); int available = get_available_cpus();
@@ -2245,7 +2244,7 @@ int main(int argc, char *argv[]) {
cli.audio_quality = -1; // Will match video quality if not specified cli.audio_quality = -1; // Will match video quality if not specified
cli.use_native_audio = 0; // TAD by default cli.use_native_audio = 0; // TAD by default
// Initialize threading defaults: min(8, available CPUs) // Initialize threading
cli.num_threads = get_default_thread_count(); cli.num_threads = get_default_thread_count();
// Command-line options // Command-line options
@@ -2438,8 +2437,8 @@ int main(int argc, char *argv[]) {
break; break;
case 't': { // --threads case 't': { // --threads
int threads = atoi(optarg); int threads = atoi(optarg);
if (threads < 0 || threads > MAX_THREADS) { if (threads < 0) {
fprintf(stderr, "Error: Thread count must be 0-%d\n", MAX_THREADS); fprintf(stderr, "Error: Thread count must be positive\n");
return 1; return 1;
} }
// Both 0 and 1 mean single-threaded (use value 0 internally) // Both 0 and 1 mean single-threaded (use value 0 internally)