diff --git a/terranmon.txt b/terranmon.txt index c104c40..9dcd38b 100644 --- a/terranmon.txt +++ b/terranmon.txt @@ -988,10 +988,11 @@ transmission capability, and region-of-interest coding. The encoder supports following presets: - Sports: use finer temporal quantisation, resulting in better-preserved motion. Less effective as resolution goes up - Anime: instructs the decoder to disable grain synthensis + +2025-12-08 Addendum: TAV-DT should be its own encoder, not preset - D1/D1PAL: encode to TAV-DT (NTSC/PAL) format. Any non-compliant setup will be ignored and substituted to compliant values - D1P/D1PALP: encode to TAV-DT Progressive (NTSC/PAL) format. Any non-compliant setup will be ignored and substituted to compliant values - ## Packet Structure (some special packets have no payload. See Packet Types for details) uint8 Packet Type uint32 Payload Size @@ -1021,6 +1022,9 @@ The encoder supports following presets: 0x42: Zstd-compressed 16-bit PCM (32 KHz, little endian) 0x43: Zstd-compressed ADPCM (32 KHz) 0x44: TAD (TSVM Advanced Audio) + + 0x50: TAV-DT audio packet with forward error correction + 0x51: TAV-DT video packet with forward error correction 0x70..7F: Reserved for Future Version @@ -1626,22 +1630,57 @@ start of the next packet - Entropy coder: EZBC - Encoder preset: sports preset always enabled - Tiles: monoblock + - GOP size: always 16 frames # Packet Structure uint32 Sync pattern (0xE3537A1F for NTSC Dimension, 0xD193A745 for PAL Dimension) - uint8 Framerate - uint8 Flags - - bit 0 = interlaced - - bit 1 = is NTSC framerate - - bit 4-7 = quality index (0-5) - * Quality indices follow TSVM encoder's - int16 Reserved (zero-fill) - uint32 Total packet size past 16-byte header, modulo 2^32 - !! this value should NOT be used to derive the actual packet size !! - uint32 CRC-32 of 12-byte header - uint64 Timecode (0xFD packet) without header byte - * TAD packet (full 0x24 packet) - * TAV packet (full 0x10 or 0x12 packet) + + uint8 Framerate + uint8 Flags + - bit 0 = interlaced + - bit 1 = is NTSC framerate + - bit 4-7 = quality index (0-5) + * Quality indices follow TSVM encoder's + int16 Reserved (zero-fill) + uint32 Total packet size past 16-byte header + uint32 CRC-32 of 12-byte header + + uint64 Timecode in nanoseconds (repeated thrice; bitwise majority) + * TAD with LPDC (0x50) + uint8 Packet type (0x50) + + uint16 Sample Count + uint32 Compressed Size + 14 + + + uint16 Sample Count + uint8 Quantiser Bits + uint32 Compressed Size + + + uint8 FEC Code ID + uint16 FEC Block size or codebook ID + uint16 FEC parity length + + + * Zstd-compressed TAD + * Parity for Zstd-compressed TAD + + * TAV with LPDC (0x51) + uint8 Packet type (0x51) + + uint8 GOP Size (number of frames in this GOP) + uint32 Compressed Size + + + uint8 FEC Code ID + uint16 FEC Block size or codebook ID + uint16 FEC parity length + + + * Zstd-compressed Unified Block Data + * Parity for Zstd-compressed Unified Block Data + # How to sync to the stream 1. Find a sync pattern @@ -1651,7 +1690,6 @@ start of the next packet 5. Check calculated CRC against stored CRC 6. If they match, sync to the stream; if not, find a next sync pattern - -------------------------------------------------------------------------------- TSVM Advanced Audio (TAD) Format diff --git a/video_encoder/Makefile b/video_encoder/Makefile index f115b3d..331f0cd 100644 --- a/video_encoder/Makefile +++ b/video_encoder/Makefile @@ -44,7 +44,7 @@ LIBTADDEC_OBJ = lib/libtaddec/decoder_tad.o # ============================================================================= # Source files and targets -TARGETS = clean libs encoder_tav_ref#tev tav tav_decoder tav_inspector tav_dt_decoder +TARGETS = clean libs encoder_tav_ref decoder_tav_ref tav_inspector TAD_TARGETS = encoder_tad decoder_tad LIBRARIES = lib/libtavenc.a lib/libtavdec.a lib/libtadenc.a lib/libtaddec.a TEST_TARGETS = test_mesh_warp test_mesh_roundtrip @@ -67,13 +67,6 @@ tav: src/encoder_tav.c lib/libtadenc/encoder_tad.c encoder_tav_opencv.cpp $(CXX) $(CXXFLAGS) $(OPENCV_CFLAGS) $(ZSTD_CFLAGS) -c encoder_tav_opencv.cpp -o encoder_tav_opencv.o $(CXX) $(DBGFLAGS) -o encoder_tav encoder_tav.o encoder_tad.o encoder_tav_opencv.o $(LIBS) $(OPENCV_LIBS) -# New library-based TAV encoder -tav_decoder: src/decoder_tav.c lib/libtaddec/decoder_tad.c include/decoder_tad.h - rm -f decoder_tav decoder_tav.o decoder_tad.o - $(CC) $(CFLAGS) $(ZSTD_CFLAGS) -DTAD_DECODER_LIB -c lib/libtaddec/decoder_tad.c -o decoder_tad.o - $(CC) $(CFLAGS) $(ZSTD_CFLAGS) -c src/decoder_tav.c -o decoder_tav.o - $(CC) $(DBGFLAGS) -o decoder_tav decoder_tav.o decoder_tad.o $(LIBS) - tav_inspector: tav_inspector.c rm -f tav_inspector $(CC) $(CFLAGS) $(ZSTD_CFLAGS) -o tav_inspector $< $(LIBS) diff --git a/video_encoder/lib/libtavenc/tav_encoder_tile.c b/video_encoder/lib/libtavenc/tav_encoder_tile.c new file mode 100644 index 0000000..b63b362 --- /dev/null +++ b/video_encoder/lib/libtavenc/tav_encoder_tile.c @@ -0,0 +1,159 @@ +/** + * TAV Encoder Library - Tile Processing Implementation + */ + +#include "tav_encoder_tile.h" +#include "tav_encoder_dwt.h" +#include +#include + +#define CLAMP(x, min, max) ((x) < (min) ? (min) : ((x) > (max) ? (max) : (x))) + +void tav_extract_padded_tile(const float *frame_y, const float *frame_co, const float *frame_cg, + int frame_width, int frame_height, + int tile_x, int tile_y, + float *padded_y, float *padded_co, float *padded_cg) { + const int core_start_x = tile_x * TAV_TILE_SIZE_X; + const int core_start_y = tile_y * TAV_TILE_SIZE_Y; + + // Process row by row with bulk copying for core region where possible + for (int py = 0; py < TAV_PADDED_TILE_SIZE_Y; py++) { + // Map padded row to source image row + int src_y = core_start_y + py - TAV_TILE_MARGIN; + + // Handle vertical boundary conditions with mirroring + if (src_y < 0) { + src_y = -src_y; + } else if (src_y >= frame_height) { + src_y = frame_height - 1 - (src_y - frame_height); + } + src_y = CLAMP(src_y, 0, frame_height - 1); + + // Calculate source and destination row offsets + const int padded_row_offset = py * TAV_PADDED_TILE_SIZE_X; + const int src_row_offset = src_y * frame_width; + + // Margin boundaries in padded tile + const int core_start_px = TAV_TILE_MARGIN; + const int core_end_px = TAV_TILE_MARGIN + TAV_TILE_SIZE_X; + + // Check if core region is entirely within frame bounds + const int core_src_start_x = core_start_x; + const int core_src_end_x = core_start_x + TAV_TILE_SIZE_X; + + if (core_src_start_x >= 0 && core_src_end_x <= frame_width) { + // Bulk copy core region in one operation + const int src_core_offset = src_row_offset + core_src_start_x; + + memcpy(&padded_y[padded_row_offset + core_start_px], + &frame_y[src_core_offset], + TAV_TILE_SIZE_X * sizeof(float)); + memcpy(&padded_co[padded_row_offset + core_start_px], + &frame_co[src_core_offset], + TAV_TILE_SIZE_X * sizeof(float)); + memcpy(&padded_cg[padded_row_offset + core_start_px], + &frame_cg[src_core_offset], + TAV_TILE_SIZE_X * sizeof(float)); + + // Handle left margin pixels individually + for (int px = 0; px < core_start_px; px++) { + int src_x = core_start_x + px - TAV_TILE_MARGIN; + if (src_x < 0) src_x = -src_x; + src_x = CLAMP(src_x, 0, frame_width - 1); + + int src_idx = src_row_offset + src_x; + int padded_idx = padded_row_offset + px; + + padded_y[padded_idx] = frame_y[src_idx]; + padded_co[padded_idx] = frame_co[src_idx]; + padded_cg[padded_idx] = frame_cg[src_idx]; + } + + // Handle right margin pixels individually + for (int px = core_end_px; px < TAV_PADDED_TILE_SIZE_X; px++) { + int src_x = core_start_x + px - TAV_TILE_MARGIN; + if (src_x >= frame_width) { + src_x = frame_width - 1 - (src_x - frame_width); + } + src_x = CLAMP(src_x, 0, frame_width - 1); + + int src_idx = src_row_offset + src_x; + int padded_idx = padded_row_offset + px; + + padded_y[padded_idx] = frame_y[src_idx]; + padded_co[padded_idx] = frame_co[src_idx]; + padded_cg[padded_idx] = frame_cg[src_idx]; + } + } else { + // Fallback: process entire row pixel by pixel (for edge tiles) + for (int px = 0; px < TAV_PADDED_TILE_SIZE_X; px++) { + int src_x = core_start_x + px - TAV_TILE_MARGIN; + + // Handle horizontal boundary conditions with mirroring + if (src_x < 0) { + src_x = -src_x; + } else if (src_x >= frame_width) { + src_x = frame_width - 1 - (src_x - frame_width); + } + src_x = CLAMP(src_x, 0, frame_width - 1); + + int src_idx = src_row_offset + src_x; + int padded_idx = padded_row_offset + px; + + padded_y[padded_idx] = frame_y[src_idx]; + padded_co[padded_idx] = frame_co[src_idx]; + padded_cg[padded_idx] = frame_cg[src_idx]; + } + } + } +} + +// Use existing 2D DWT from tav_encoder_dwt.c +// For padded tiles, we simply call the existing function with tile dimensions + +void tav_dwt_2d_forward_padded_tile(float *tile_data, int levels, int filter_type) { + // Use the existing 2D DWT with padded tile dimensions + tav_dwt_2d_forward(tile_data, TAV_PADDED_TILE_SIZE_X, TAV_PADDED_TILE_SIZE_Y, + levels, filter_type); +} + +void tav_dwt_2d_inverse_padded_tile(float *tile_data, int levels, int filter_type) { + // Note: Inverse transform not yet implemented in library for arbitrary dimensions + // For now, this is a placeholder - decoder uses different code path + (void)tile_data; + (void)levels; + (void)filter_type; +} + +void tav_crop_tile_margins(const float *padded_data, float *core_data) { + for (int y = 0; y < TAV_TILE_SIZE_Y; y++) { + const int padded_row = (y + TAV_TILE_MARGIN) * TAV_PADDED_TILE_SIZE_X + TAV_TILE_MARGIN; + const int core_row = y * TAV_TILE_SIZE_X; + memcpy(&core_data[core_row], &padded_data[padded_row], TAV_TILE_SIZE_X * sizeof(float)); + } +} + +void tav_crop_tile_margins_edge(const float *padded_data, float *core_data, + int actual_width, int actual_height) { + for (int y = 0; y < actual_height; y++) { + const int padded_row = (y + TAV_TILE_MARGIN) * TAV_PADDED_TILE_SIZE_X + TAV_TILE_MARGIN; + const int core_row = y * actual_width; + memcpy(&core_data[core_row], &padded_data[padded_row], actual_width * sizeof(float)); + } +} + +void tav_get_tile_dimensions(int frame_width, int frame_height, + int tile_x, int tile_y, + int *tile_width, int *tile_height) { + // Calculate the starting position of this tile + int start_x = tile_x * TAV_TILE_SIZE_X; + int start_y = tile_y * TAV_TILE_SIZE_Y; + + // Calculate how much of the frame is left from this starting position + int remaining_width = frame_width - start_x; + int remaining_height = frame_height - start_y; + + // Tile width is the minimum of standard tile size and remaining width + *tile_width = (remaining_width < TAV_TILE_SIZE_X) ? remaining_width : TAV_TILE_SIZE_X; + *tile_height = (remaining_height < TAV_TILE_SIZE_Y) ? remaining_height : TAV_TILE_SIZE_Y; +} diff --git a/video_encoder/lib/libtavenc/tav_encoder_tile.h b/video_encoder/lib/libtavenc/tav_encoder_tile.h new file mode 100644 index 0000000..de7e839 --- /dev/null +++ b/video_encoder/lib/libtavenc/tav_encoder_tile.h @@ -0,0 +1,103 @@ +/** + * TAV Encoder Library - Tile Processing + * + * Functions for padded tile extraction and DWT processing. + * Used when video dimensions exceed monoblock threshold (720x576). + */ + +#ifndef TAV_ENCODER_TILE_H +#define TAV_ENCODER_TILE_H + +#include +#include +#include "../../include/tav_encoder_lib.h" + +// Tile dimensions (from header) +// TAV_TILE_SIZE_X = 640, TAV_TILE_SIZE_Y = 540 +// TAV_PADDED_TILE_SIZE_X = 704, TAV_PADDED_TILE_SIZE_Y = 604 +// TAV_TILE_MARGIN = 32 + +/** + * Extract a padded tile from full-frame YCoCg buffers. + * + * Extracts a tile at position (tile_x, tile_y) with TAV_TILE_MARGIN pixels + * of padding on all sides for seamless DWT processing. Uses symmetric + * extension (mirroring) at frame boundaries. + * + * @param frame_y Full frame Y channel + * @param frame_co Full frame Co channel + * @param frame_cg Full frame Cg channel + * @param frame_width Full frame width + * @param frame_height Full frame height + * @param tile_x Tile X index (0-based) + * @param tile_y Tile Y index (0-based) + * @param padded_y Output: Padded tile Y (PADDED_TILE_SIZE_X * PADDED_TILE_SIZE_Y floats) + * @param padded_co Output: Padded tile Co + * @param padded_cg Output: Padded tile Cg + */ +void tav_extract_padded_tile(const float *frame_y, const float *frame_co, const float *frame_cg, + int frame_width, int frame_height, + int tile_x, int tile_y, + float *padded_y, float *padded_co, float *padded_cg); + +/** + * Apply 2D DWT forward transform to a padded tile. + * + * Uses fixed PADDED_TILE_SIZE dimensions (704x604) for optimal performance. + * + * @param tile_data Tile data (modified in-place) + * @param levels Number of decomposition levels + * @param filter_type Wavelet filter type (0=CDF 5/3, 1=CDF 9/7, etc.) + */ +void tav_dwt_2d_forward_padded_tile(float *tile_data, int levels, int filter_type); + +/** + * Apply 2D DWT inverse transform to a padded tile. + * + * @param tile_data Tile data (modified in-place) + * @param levels Number of decomposition levels + * @param filter_type Wavelet filter type + */ +void tav_dwt_2d_inverse_padded_tile(float *tile_data, int levels, int filter_type); + +/** + * Crop a padded tile to its core region (removing margins). + * + * Extracts the central TAV_TILE_SIZE_X × TAV_TILE_SIZE_Y region from a padded tile. + * + * @param padded_data Padded tile (PADDED_TILE_SIZE_X * PADDED_TILE_SIZE_Y) + * @param core_data Output: Core tile (TILE_SIZE_X * TILE_SIZE_Y) + */ +void tav_crop_tile_margins(const float *padded_data, float *core_data); + +/** + * Crop a padded tile to actual dimensions for edge tiles. + * + * For tiles at the right/bottom edges of a frame, the actual tile may be + * smaller than TILE_SIZE_X × TILE_SIZE_Y. This function handles that case. + * + * @param padded_data Padded tile (PADDED_TILE_SIZE_X * PADDED_TILE_SIZE_Y) + * @param core_data Output: Core tile data + * @param actual_width Actual tile width (may be < TILE_SIZE_X for edge tiles) + * @param actual_height Actual tile height (may be < TILE_SIZE_Y for edge tiles) + */ +void tav_crop_tile_margins_edge(const float *padded_data, float *core_data, + int actual_width, int actual_height); + +/** + * Calculate actual tile dimensions for a given tile position. + * + * Edge tiles may be smaller than the standard tile size. + * + * @param frame_width Full frame width + * @param frame_height Full frame height + * @param tile_x Tile X index + * @param tile_y Tile Y index + * @param tile_width Output: Actual tile width + * @param tile_height Output: Actual tile height + */ +void tav_get_tile_dimensions(int frame_width, int frame_height, + int tile_x, int tile_y, + int *tile_width, int *tile_height); + +#endif // TAV_ENCODER_TILE_H diff --git a/video_encoder/src/decoder_tav.c b/video_encoder/src/decoder_tav.c index 6c1aaa8..7bdb0d1 100644 --- a/video_encoder/src/decoder_tav.c +++ b/video_encoder/src/decoder_tav.c @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include "tav_video_decoder.h" #include "decoder_tad.h" @@ -53,6 +55,31 @@ #define TAV_PACKET_SYNC_NTSC 0xFE #define TAV_PACKET_SYNC 0xFF +// Threading constants +#define MAX_DECODE_THREADS 16 +#define DECODE_SLOT_PENDING 0 +#define DECODE_SLOT_PROCESSING 1 +#define DECODE_SLOT_DONE 2 + +// ============================================================================= +// GOP Decode Job Structure (for multithreading) +// ============================================================================= + +typedef struct { + int job_id; + volatile int status; // DECODE_SLOT_* + + // Input (compressed data read from file) + uint8_t *compressed_data; + uint32_t compressed_size; + int gop_size; + + // Output (decoded frames) + uint8_t **frames; + int frames_allocated; + int decode_result; + +} gop_decode_job_t; // ============================================================================= // TAV Header Structure (32 bytes) @@ -122,6 +149,21 @@ typedef struct { int no_audio; // Skip audio decoding int dump_packets; // Debug: dump packet info + // Threading support + int num_threads; + int num_slots; + gop_decode_job_t *slots; + tav_video_context_t **worker_video_ctx; // Per-thread decoder contexts + pthread_t *worker_threads; + pthread_mutex_t mutex; + pthread_cond_t cond_job_available; + pthread_cond_t cond_slot_free; + volatile int threads_should_exit; + volatile int next_write_slot; // Next slot to write to FFmpeg + volatile int next_read_slot; // Next slot for reading from file + volatile int jobs_submitted; + volatile int jobs_completed; + } decoder_context_t; // ============================================================================= @@ -294,6 +336,231 @@ static int spawn_ffmpeg(decoder_context_t *ctx) { return 0; } +// ============================================================================= +// Multithreading Support +// ============================================================================= + +// Worker thread function - decodes GOPs in parallel +static void *decoder_worker_thread(void *arg) { + decoder_context_t *ctx = (decoder_context_t *)arg; + + // Get thread index by finding our thread ID in the array + int thread_idx = -1; + pthread_t self = pthread_self(); + for (int i = 0; i < ctx->num_threads; i++) { + if (pthread_equal(ctx->worker_threads[i], self)) { + thread_idx = i; + break; + } + } + if (thread_idx < 0) thread_idx = 0; // Fallback + + tav_video_context_t *my_video_ctx = ctx->worker_video_ctx[thread_idx]; + + while (1) { + pthread_mutex_lock(&ctx->mutex); + + // Find a pending slot to work on + int slot_idx = -1; + while (slot_idx < 0 && !ctx->threads_should_exit) { + for (int i = 0; i < ctx->num_slots; i++) { + if (ctx->slots[i].status == DECODE_SLOT_PENDING && + ctx->slots[i].compressed_data != NULL) { + slot_idx = i; + ctx->slots[i].status = DECODE_SLOT_PROCESSING; + break; + } + } + if (slot_idx < 0 && !ctx->threads_should_exit) { + pthread_cond_wait(&ctx->cond_job_available, &ctx->mutex); + } + } + + if (ctx->threads_should_exit && slot_idx < 0) { + pthread_mutex_unlock(&ctx->mutex); + break; + } + + pthread_mutex_unlock(&ctx->mutex); + + if (slot_idx < 0) continue; + + gop_decode_job_t *job = &ctx->slots[slot_idx]; + + // Decode GOP using our thread's decoder context + job->decode_result = tav_video_decode_gop( + my_video_ctx, + job->compressed_data, + job->compressed_size, + job->gop_size, + job->frames + ); + + // Free compressed data after decoding + free(job->compressed_data); + job->compressed_data = NULL; + + // Mark as done + pthread_mutex_lock(&ctx->mutex); + job->status = DECODE_SLOT_DONE; + ctx->jobs_completed++; + pthread_cond_broadcast(&ctx->cond_slot_free); + pthread_mutex_unlock(&ctx->mutex); + } + + return NULL; +} + +static int init_decoder_threads(decoder_context_t *ctx) { + if (ctx->num_threads <= 0) { + return 0; // Single-threaded mode + } + + // Limit threads + if (ctx->num_threads > MAX_DECODE_THREADS) { + ctx->num_threads = MAX_DECODE_THREADS; + } + + // Number of slots = threads + 2 for pipelining + ctx->num_slots = ctx->num_threads + 2; + + // Allocate slots + ctx->slots = calloc(ctx->num_slots, sizeof(gop_decode_job_t)); + if (!ctx->slots) { + fprintf(stderr, "Error: Failed to allocate decode slots\n"); + return -1; + } + + // Pre-allocate frame buffers for each slot (assuming max GOP size of 32) + size_t frame_size = ctx->header.width * ctx->header.height * 3; + int max_gop_size = 32; + + for (int i = 0; i < ctx->num_slots; i++) { + ctx->slots[i].job_id = -1; + ctx->slots[i].status = DECODE_SLOT_DONE; // Available + ctx->slots[i].frames = malloc(max_gop_size * sizeof(uint8_t*)); + if (!ctx->slots[i].frames) { + fprintf(stderr, "Error: Failed to allocate frame pointers for slot %d\n", i); + return -1; + } + for (int j = 0; j < max_gop_size; j++) { + ctx->slots[i].frames[j] = malloc(frame_size); + if (!ctx->slots[i].frames[j]) { + fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d frame %d\n", i, j); + return -1; + } + } + ctx->slots[i].frames_allocated = max_gop_size; + } + + // Create per-thread video decoder contexts + ctx->worker_video_ctx = malloc(ctx->num_threads * sizeof(tav_video_context_t*)); + if (!ctx->worker_video_ctx) { + fprintf(stderr, "Error: Failed to allocate worker video contexts\n"); + return -1; + } + + tav_video_params_t video_params = { + .width = ctx->header.width, + .height = ctx->header.height, + .decomp_levels = ctx->header.decomp_levels, + .temporal_levels = 2, + .wavelet_filter = ctx->header.wavelet_filter, + .temporal_wavelet = 0, + .entropy_coder = ctx->header.entropy_coder, + .channel_layout = ctx->header.channel_layout, + .perceptual_tuning = ctx->perceptual_mode, + .quantiser_y = ctx->header.quantiser_y, + .quantiser_co = ctx->header.quantiser_co, + .quantiser_cg = ctx->header.quantiser_cg, + .encoder_preset = ctx->header.encoder_preset, + .monoblock = 1 + }; + + for (int i = 0; i < ctx->num_threads; i++) { + ctx->worker_video_ctx[i] = tav_video_create(&video_params); + if (!ctx->worker_video_ctx[i]) { + fprintf(stderr, "Error: Failed to create video context for thread %d\n", i); + return -1; + } + } + + // Initialize synchronization primitives + pthread_mutex_init(&ctx->mutex, NULL); + pthread_cond_init(&ctx->cond_job_available, NULL); + pthread_cond_init(&ctx->cond_slot_free, NULL); + ctx->threads_should_exit = 0; + ctx->next_write_slot = 0; + ctx->next_read_slot = 0; + ctx->jobs_submitted = 0; + ctx->jobs_completed = 0; + + // Create worker threads + ctx->worker_threads = malloc(ctx->num_threads * sizeof(pthread_t)); + if (!ctx->worker_threads) { + fprintf(stderr, "Error: Failed to allocate worker threads\n"); + return -1; + } + + for (int i = 0; i < ctx->num_threads; i++) { + if (pthread_create(&ctx->worker_threads[i], NULL, decoder_worker_thread, ctx) != 0) { + fprintf(stderr, "Error: Failed to create worker thread %d\n", i); + return -1; + } + } + + if (ctx->verbose) { + printf("Initialized %d decoder worker threads with %d slots\n", + ctx->num_threads, ctx->num_slots); + } + + return 0; +} + +static void cleanup_decoder_threads(decoder_context_t *ctx) { + if (ctx->num_threads <= 0) return; + + // Signal threads to exit + pthread_mutex_lock(&ctx->mutex); + ctx->threads_should_exit = 1; + pthread_cond_broadcast(&ctx->cond_job_available); + pthread_mutex_unlock(&ctx->mutex); + + // Wait for threads to finish + for (int i = 0; i < ctx->num_threads; i++) { + pthread_join(ctx->worker_threads[i], NULL); + } + free(ctx->worker_threads); + ctx->worker_threads = NULL; + + // Free per-thread video contexts + for (int i = 0; i < ctx->num_threads; i++) { + tav_video_free(ctx->worker_video_ctx[i]); + } + free(ctx->worker_video_ctx); + ctx->worker_video_ctx = NULL; + + // Free slots + for (int i = 0; i < ctx->num_slots; i++) { + if (ctx->slots[i].frames) { + for (int j = 0; j < ctx->slots[i].frames_allocated; j++) { + free(ctx->slots[i].frames[j]); + } + free(ctx->slots[i].frames); + } + if (ctx->slots[i].compressed_data) { + free(ctx->slots[i].compressed_data); + } + } + free(ctx->slots); + ctx->slots = NULL; + + // Destroy sync primitives + pthread_mutex_destroy(&ctx->mutex); + pthread_cond_destroy(&ctx->cond_job_available); + pthread_cond_destroy(&ctx->cond_slot_free); +} + // ============================================================================= // Frame Buffer Management // ============================================================================= @@ -710,6 +977,301 @@ static int process_packet(decoder_context_t *ctx) { } } +// ============================================================================= +// Multithreaded Video Decoding (Pass 2) +// ============================================================================= + +// Read a single GOP packet without decoding - for multithreaded submission +static int read_gop_packet_mt(decoder_context_t *ctx, int slot_idx) { + gop_decode_job_t *job = &ctx->slots[slot_idx]; + + // Read GOP size (1 byte) + uint8_t gop_size; + if (fread(&gop_size, 1, 1, ctx->input_fp) != 1) { + return -1; + } + ctx->bytes_read++; + + // Read compressed size (4 bytes) + uint32_t compressed_size; + if (fread(&compressed_size, 4, 1, ctx->input_fp) != 1) { + return -1; + } + ctx->bytes_read += 4; + + // Read compressed data + uint8_t *compressed_data = malloc(compressed_size); + if (!compressed_data) { + fprintf(stderr, "Error: Failed to allocate compressed data buffer\n"); + return -1; + } + + if (fread(compressed_data, 1, compressed_size, ctx->input_fp) != compressed_size) { + free(compressed_data); + return -1; + } + ctx->bytes_read += compressed_size; + + // Fill job + job->compressed_data = compressed_data; + job->compressed_size = compressed_size; + job->gop_size = gop_size; + job->decode_result = 0; + + return gop_size; +} + +// Multithreaded pass 2 decoding loop +static int decode_video_pass2_mt(decoder_context_t *ctx) { + size_t frame_size = ctx->header.width * ctx->header.height * 3; + int done = 0; + int job_counter = 0; + + while (!done) { + // Try to submit new jobs to any free slots + pthread_mutex_lock(&ctx->mutex); + + // Find a free slot + int free_slot = -1; + for (int i = 0; i < ctx->num_slots; i++) { + if (ctx->slots[i].status == DECODE_SLOT_DONE && + ctx->slots[i].compressed_data == NULL) { + free_slot = i; + break; + } + } + + pthread_mutex_unlock(&ctx->mutex); + + if (free_slot >= 0) { + // Read next packet + uint8_t packet_type; + if (fread(&packet_type, 1, 1, ctx->input_fp) != 1) { + // EOF + done = 1; + } else { + ctx->bytes_read++; + + if (packet_type == TAV_PACKET_GOP_UNIFIED) { + // Read GOP and submit to slot + int gop_size = read_gop_packet_mt(ctx, free_slot); + if (gop_size > 0) { + pthread_mutex_lock(&ctx->mutex); + ctx->slots[free_slot].job_id = job_counter++; + ctx->slots[free_slot].status = DECODE_SLOT_PENDING; + ctx->jobs_submitted++; + pthread_cond_broadcast(&ctx->cond_job_available); + pthread_mutex_unlock(&ctx->mutex); + } else { + done = 1; + } + } else if (packet_type == TAV_PACKET_IFRAME) { + // For I-frames, decode synchronously (they're rare) + process_iframe_packet(ctx); + } else { + // Skip other packets (audio already extracted in Pass 1) + switch (packet_type) { + case TAV_PACKET_AUDIO_TAD: { + // TAD format: [sample_count(2)][payload_size+7(4)][data...] + uint16_t sample_count; + uint32_t payload_size; + if (fread(&sample_count, 2, 1, ctx->input_fp) != 1) { done = 1; break; } + if (fread(&payload_size, 4, 1, ctx->input_fp) != 1) { done = 1; break; } + ctx->bytes_read += 6; + fseek(ctx->input_fp, payload_size, SEEK_CUR); + ctx->bytes_read += payload_size; + break; + } + case TAV_PACKET_AUDIO_PCM8: + case TAV_PACKET_AUDIO_MP2: + case TAV_PACKET_AUDIO_TRACK: + case TAV_PACKET_SUBTITLE: + case TAV_PACKET_SUBTITLE_TC: + case TAV_PACKET_PFRAME: { + uint32_t size; + if (fread(&size, 4, 1, ctx->input_fp) != 1) { done = 1; break; } + ctx->bytes_read += 4; + fseek(ctx->input_fp, size, SEEK_CUR); + ctx->bytes_read += size; + break; + } + case TAV_PACKET_SCREEN_MASK: + fseek(ctx->input_fp, 4, SEEK_CUR); + ctx->bytes_read += 4; + break; + case TAV_PACKET_GOP_SYNC: + fseek(ctx->input_fp, 1, SEEK_CUR); + ctx->bytes_read += 1; + break; + case TAV_PACKET_TIMECODE: + fseek(ctx->input_fp, 8, SEEK_CUR); + ctx->bytes_read += 8; + break; + case TAV_PACKET_EXTENDED_HDR: { + // Skip extended header + uint16_t num_pairs; + if (fread(&num_pairs, 2, 1, ctx->input_fp) != 1) { done = 1; break; } + ctx->bytes_read += 2; + for (int i = 0; i < num_pairs; i++) { + uint8_t kv_header[5]; + if (fread(kv_header, 1, 5, ctx->input_fp) != 5) break; + ctx->bytes_read += 5; + uint8_t value_type = kv_header[4]; + if (value_type == 0x04) { + fseek(ctx->input_fp, 8, SEEK_CUR); + ctx->bytes_read += 8; + } else if (value_type == 0x10) { + uint16_t length; + if (fread(&length, 2, 1, ctx->input_fp) != 1) break; + ctx->bytes_read += 2; + fseek(ctx->input_fp, length, SEEK_CUR); + ctx->bytes_read += length; + } else if (value_type <= 0x04) { + int sizes[] = {2, 3, 4, 6, 8}; + fseek(ctx->input_fp, sizes[value_type], SEEK_CUR); + ctx->bytes_read += sizes[value_type]; + } + } + break; + } + case TAV_PACKET_SYNC_NTSC: + case TAV_PACKET_SYNC: + // No payload + break; + default: + // Unknown packet, try to skip + { + uint32_t size; + if (fread(&size, 4, 1, ctx->input_fp) == 1 && size < 1000000) { + fseek(ctx->input_fp, size, SEEK_CUR); + ctx->bytes_read += 4 + size; + } + } + break; + } + } + } + } + + // Write completed jobs in order + pthread_mutex_lock(&ctx->mutex); + while (1) { + // Find the next job to write (by job_id order) + int write_slot = -1; + int min_job_id = INT32_MAX; + for (int i = 0; i < ctx->num_slots; i++) { + if (ctx->slots[i].status == DECODE_SLOT_DONE && + ctx->slots[i].job_id >= 0 && + ctx->slots[i].job_id < min_job_id) { + // Check if this is the next expected job + if (ctx->slots[i].job_id == ctx->next_write_slot) { + write_slot = i; + break; + } + min_job_id = ctx->slots[i].job_id; + } + } + + if (write_slot < 0) { + // No jobs ready in order, wait if there are pending jobs + if (!done && ctx->jobs_submitted > ctx->next_write_slot) { + // Wait for job to complete + pthread_cond_wait(&ctx->cond_slot_free, &ctx->mutex); + continue; + } + break; + } + + pthread_mutex_unlock(&ctx->mutex); + + // Write frames to FFmpeg + gop_decode_job_t *job = &ctx->slots[write_slot]; + if (job->decode_result >= 0) { + for (int i = 0; i < job->gop_size; i++) { + if (ctx->video_pipe) { + fwrite(job->frames[i], 1, frame_size, ctx->video_pipe); + } + ctx->frames_decoded++; + + if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { + done = 1; + break; + } + } + ctx->gops_decoded++; + } + + // Mark slot as free + pthread_mutex_lock(&ctx->mutex); + job->job_id = -1; + ctx->next_write_slot++; + pthread_mutex_unlock(&ctx->mutex); + + // Progress + time_t elapsed = time(NULL) - ctx->start_time; + double fps = elapsed > 0 ? (double)ctx->frames_decoded / elapsed : 0.0; + printf("\rFrames: %lu | GOPs: %lu | %.1f fps", + ctx->frames_decoded, ctx->gops_decoded, fps); + fflush(stdout); + + pthread_mutex_lock(&ctx->mutex); + } + pthread_mutex_unlock(&ctx->mutex); + + // Check decode limit + if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { + done = 1; + } + } + + // Wait for remaining jobs to complete + pthread_mutex_lock(&ctx->mutex); + while (ctx->jobs_completed < ctx->jobs_submitted) { + pthread_cond_wait(&ctx->cond_slot_free, &ctx->mutex); + } + + // Write any remaining completed jobs + while (1) { + int write_slot = -1; + for (int i = 0; i < ctx->num_slots; i++) { + if (ctx->slots[i].status == DECODE_SLOT_DONE && + ctx->slots[i].job_id == ctx->next_write_slot) { + write_slot = i; + break; + } + } + + if (write_slot < 0) break; + + pthread_mutex_unlock(&ctx->mutex); + + gop_decode_job_t *job = &ctx->slots[write_slot]; + if (job->decode_result >= 0) { + for (int i = 0; i < job->gop_size; i++) { + if (ctx->video_pipe) { + fwrite(job->frames[i], 1, frame_size, ctx->video_pipe); + } + ctx->frames_decoded++; + } + ctx->gops_decoded++; + } + + pthread_mutex_lock(&ctx->mutex); + job->job_id = -1; + ctx->next_write_slot++; + + time_t elapsed = time(NULL) - ctx->start_time; + double fps = elapsed > 0 ? (double)ctx->frames_decoded / elapsed : 0.0; + printf("\rFrames: %lu | GOPs: %lu | %.1f fps", + ctx->frames_decoded, ctx->gops_decoded, fps); + fflush(stdout); + } + pthread_mutex_unlock(&ctx->mutex); + + printf("\n"); + return 0; +} + // ============================================================================= // Main Decoding Loop // ============================================================================= @@ -755,27 +1317,44 @@ static int decode_video(decoder_context_t *ctx) { return -1; } - // Pass 2: Video decoding - uint64_t last_reported = 0; - while (process_packet(ctx) == 0) { - // Progress reporting - show when frames were decoded - if (ctx->frames_decoded != last_reported) { - time_t elapsed = time(NULL) - ctx->start_time; - double fps = elapsed > 0 ? (double)ctx->frames_decoded / elapsed : 0.0; - printf("\rFrames: %lu | GOPs: %lu | %.1f fps", - ctx->frames_decoded, ctx->gops_decoded, fps); - fflush(stdout); - last_reported = ctx->frames_decoded; - } - - // Check decode limit - if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { - break; + // Initialize decoder threads if multithreaded mode + if (ctx->num_threads > 0) { + if (init_decoder_threads(ctx) < 0) { + fprintf(stderr, "Error: Failed to initialize decoder threads\n"); + return -1; } + printf(" Using %d decoder threads\n", ctx->num_threads); } - printf("\n"); - return 0; + // Pass 2: Video decoding + if (ctx->num_threads > 0) { + // Multithreaded decode + int result = decode_video_pass2_mt(ctx); + cleanup_decoder_threads(ctx); + return result; + } else { + // Single-threaded decode + uint64_t last_reported = 0; + while (process_packet(ctx) == 0) { + // Progress reporting - show when frames were decoded + if (ctx->frames_decoded != last_reported) { + time_t elapsed = time(NULL) - ctx->start_time; + double fps = elapsed > 0 ? (double)ctx->frames_decoded / elapsed : 0.0; + printf("\rFrames: %lu | GOPs: %lu | %.1f fps", + ctx->frames_decoded, ctx->gops_decoded, fps); + fflush(stdout); + last_reported = ctx->frames_decoded; + } + + // Check decode limit + if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) { + break; + } + } + + printf("\n"); + return 0; + } } // ============================================================================= @@ -816,6 +1395,7 @@ static void print_usage(const char *program) { printf(" --no-audio Skip audio decoding\n"); printf(" --decode-limit N Decode only first N frames\n"); printf(" --dump-packets Debug: print packet info\n"); + printf(" -t, --threads N Number of decoder threads (0=single-threaded, default)\n"); printf(" -v, --verbose Verbose output\n"); printf(" --help Show this help\n"); printf("\nExamples:\n"); @@ -835,6 +1415,7 @@ int main(int argc, char *argv[]) { {"input", required_argument, 0, 'i'}, {"output", required_argument, 0, 'o'}, {"verbose", no_argument, 0, 'v'}, + {"threads", required_argument, 0, 't'}, {"raw", no_argument, 0, 1001}, {"no-audio", no_argument, 0, 1002}, {"decode-limit", required_argument, 0, 1003}, @@ -844,7 +1425,7 @@ int main(int argc, char *argv[]) { }; int c, option_index = 0; - while ((c = getopt_long(argc, argv, "i:o:vh", long_options, &option_index)) != -1) { + while ((c = getopt_long(argc, argv, "i:o:t:vh", long_options, &option_index)) != -1) { switch (c) { case 'i': ctx.input_file = strdup(optarg); @@ -855,6 +1436,9 @@ int main(int argc, char *argv[]) { case 'v': ctx.verbose = 1; break; + case 't': + ctx.num_threads = atoi(optarg); + break; case 1001: ctx.output_raw = 1; break;