tavenc: multithreaded decoding

This commit is contained in:
minjaesong
2025-12-08 16:07:20 +09:00
parent 34a1f0e3db
commit c6c50c2ebe
5 changed files with 919 additions and 42 deletions

View File

@@ -988,10 +988,11 @@ transmission capability, and region-of-interest coding.
The encoder supports following presets:
- Sports: use finer temporal quantisation, resulting in better-preserved motion. Less effective as resolution goes up
- Anime: instructs the decoder to disable grain synthensis
2025-12-08 Addendum: TAV-DT should be its own encoder, not preset
- D1/D1PAL: encode to TAV-DT (NTSC/PAL) format. Any non-compliant setup will be ignored and substituted to compliant values
- D1P/D1PALP: encode to TAV-DT Progressive (NTSC/PAL) format. Any non-compliant setup will be ignored and substituted to compliant values
## Packet Structure (some special packets have no payload. See Packet Types for details)
uint8 Packet Type
uint32 Payload Size
@@ -1021,6 +1022,9 @@ The encoder supports following presets:
0x42: Zstd-compressed 16-bit PCM (32 KHz, little endian)
0x43: Zstd-compressed ADPCM (32 KHz)
0x44: TAD (TSVM Advanced Audio)
<TAV-DT specific>
0x50: TAV-DT audio packet with forward error correction
0x51: TAV-DT video packet with forward error correction
<multiplexed video>
0x70..7F: Reserved for Future Version
<Standard metadata payloads>
@@ -1626,9 +1630,11 @@ start of the next packet
- Entropy coder: EZBC
- Encoder preset: sports preset always enabled
- Tiles: monoblock
- GOP size: always 16 frames
# Packet Structure
uint32 Sync pattern (0xE3537A1F for NTSC Dimension, 0xD193A745 for PAL Dimension)
<packet header start>
uint8 Framerate
uint8 Flags
- bit 0 = interlaced
@@ -1636,12 +1642,45 @@ start of the next packet
- bit 4-7 = quality index (0-5)
* Quality indices follow TSVM encoder's
int16 Reserved (zero-fill)
uint32 Total packet size past 16-byte header, modulo 2^32
!! this value should NOT be used to derive the actual packet size !!
uint32 Total packet size past 16-byte header
uint32 CRC-32 of 12-byte header
uint64 Timecode (0xFD packet) without header byte
* TAD packet (full 0x24 packet)
* TAV packet (full 0x10 or 0x12 packet)
<packet header end; encoded with rate 1/2 LPDC>
uint64 Timecode in nanoseconds (repeated thrice; bitwise majority)
* TAD with LPDC (0x50)
uint8 Packet type (0x50)
<TAD header start>
uint16 Sample Count
uint32 Compressed Size + 14
<TAD header end; encoded with rate 1/2 LPDC>
<TAD chunk header start>
uint16 Sample Count
uint8 Quantiser Bits
uint32 Compressed Size
<TAD chunk header end; encoded with rate 1/2 LPDC>
<LPDC header start>
uint8 FEC Code ID
uint16 FEC Block size or codebook ID
uint16 FEC parity length
<LPDC header end; encoded with rate 1/2 LPDC>
<Reed-Solomon block start>
* Zstd-compressed TAD
* Parity for Zstd-compressed TAD
<Reed-Solomon block end>
* TAV with LPDC (0x51)
uint8 Packet type (0x51)
<TAV header start>
uint8 GOP Size (number of frames in this GOP)
uint32 Compressed Size
<TAV header end; encoded with rate 1/2 LPDC>
<LPDC header start>
uint8 FEC Code ID
uint16 FEC Block size or codebook ID
uint16 FEC parity length
<LPDC header end; encoded with rate 1/2 LPDC>
<Reed-Solomon block start>
* Zstd-compressed Unified Block Data
* Parity for Zstd-compressed Unified Block Data
<Reed-Solomon block end>
# How to sync to the stream
1. Find a sync pattern
@@ -1651,7 +1690,6 @@ start of the next packet
5. Check calculated CRC against stored CRC
6. If they match, sync to the stream; if not, find a next sync pattern
--------------------------------------------------------------------------------
TSVM Advanced Audio (TAD) Format

View File

@@ -44,7 +44,7 @@ LIBTADDEC_OBJ = lib/libtaddec/decoder_tad.o
# =============================================================================
# Source files and targets
TARGETS = clean libs encoder_tav_ref#tev tav tav_decoder tav_inspector tav_dt_decoder
TARGETS = clean libs encoder_tav_ref decoder_tav_ref tav_inspector
TAD_TARGETS = encoder_tad decoder_tad
LIBRARIES = lib/libtavenc.a lib/libtavdec.a lib/libtadenc.a lib/libtaddec.a
TEST_TARGETS = test_mesh_warp test_mesh_roundtrip
@@ -67,13 +67,6 @@ tav: src/encoder_tav.c lib/libtadenc/encoder_tad.c encoder_tav_opencv.cpp
$(CXX) $(CXXFLAGS) $(OPENCV_CFLAGS) $(ZSTD_CFLAGS) -c encoder_tav_opencv.cpp -o encoder_tav_opencv.o
$(CXX) $(DBGFLAGS) -o encoder_tav encoder_tav.o encoder_tad.o encoder_tav_opencv.o $(LIBS) $(OPENCV_LIBS)
# New library-based TAV encoder
tav_decoder: src/decoder_tav.c lib/libtaddec/decoder_tad.c include/decoder_tad.h
rm -f decoder_tav decoder_tav.o decoder_tad.o
$(CC) $(CFLAGS) $(ZSTD_CFLAGS) -DTAD_DECODER_LIB -c lib/libtaddec/decoder_tad.c -o decoder_tad.o
$(CC) $(CFLAGS) $(ZSTD_CFLAGS) -c src/decoder_tav.c -o decoder_tav.o
$(CC) $(DBGFLAGS) -o decoder_tav decoder_tav.o decoder_tad.o $(LIBS)
tav_inspector: tav_inspector.c
rm -f tav_inspector
$(CC) $(CFLAGS) $(ZSTD_CFLAGS) -o tav_inspector $< $(LIBS)

View File

@@ -0,0 +1,159 @@
/**
* TAV Encoder Library - Tile Processing Implementation
*/
#include "tav_encoder_tile.h"
#include "tav_encoder_dwt.h"
#include <string.h>
#include <stdlib.h>
#define CLAMP(x, min, max) ((x) < (min) ? (min) : ((x) > (max) ? (max) : (x)))
void tav_extract_padded_tile(const float *frame_y, const float *frame_co, const float *frame_cg,
int frame_width, int frame_height,
int tile_x, int tile_y,
float *padded_y, float *padded_co, float *padded_cg) {
const int core_start_x = tile_x * TAV_TILE_SIZE_X;
const int core_start_y = tile_y * TAV_TILE_SIZE_Y;
// Process row by row with bulk copying for core region where possible
for (int py = 0; py < TAV_PADDED_TILE_SIZE_Y; py++) {
// Map padded row to source image row
int src_y = core_start_y + py - TAV_TILE_MARGIN;
// Handle vertical boundary conditions with mirroring
if (src_y < 0) {
src_y = -src_y;
} else if (src_y >= frame_height) {
src_y = frame_height - 1 - (src_y - frame_height);
}
src_y = CLAMP(src_y, 0, frame_height - 1);
// Calculate source and destination row offsets
const int padded_row_offset = py * TAV_PADDED_TILE_SIZE_X;
const int src_row_offset = src_y * frame_width;
// Margin boundaries in padded tile
const int core_start_px = TAV_TILE_MARGIN;
const int core_end_px = TAV_TILE_MARGIN + TAV_TILE_SIZE_X;
// Check if core region is entirely within frame bounds
const int core_src_start_x = core_start_x;
const int core_src_end_x = core_start_x + TAV_TILE_SIZE_X;
if (core_src_start_x >= 0 && core_src_end_x <= frame_width) {
// Bulk copy core region in one operation
const int src_core_offset = src_row_offset + core_src_start_x;
memcpy(&padded_y[padded_row_offset + core_start_px],
&frame_y[src_core_offset],
TAV_TILE_SIZE_X * sizeof(float));
memcpy(&padded_co[padded_row_offset + core_start_px],
&frame_co[src_core_offset],
TAV_TILE_SIZE_X * sizeof(float));
memcpy(&padded_cg[padded_row_offset + core_start_px],
&frame_cg[src_core_offset],
TAV_TILE_SIZE_X * sizeof(float));
// Handle left margin pixels individually
for (int px = 0; px < core_start_px; px++) {
int src_x = core_start_x + px - TAV_TILE_MARGIN;
if (src_x < 0) src_x = -src_x;
src_x = CLAMP(src_x, 0, frame_width - 1);
int src_idx = src_row_offset + src_x;
int padded_idx = padded_row_offset + px;
padded_y[padded_idx] = frame_y[src_idx];
padded_co[padded_idx] = frame_co[src_idx];
padded_cg[padded_idx] = frame_cg[src_idx];
}
// Handle right margin pixels individually
for (int px = core_end_px; px < TAV_PADDED_TILE_SIZE_X; px++) {
int src_x = core_start_x + px - TAV_TILE_MARGIN;
if (src_x >= frame_width) {
src_x = frame_width - 1 - (src_x - frame_width);
}
src_x = CLAMP(src_x, 0, frame_width - 1);
int src_idx = src_row_offset + src_x;
int padded_idx = padded_row_offset + px;
padded_y[padded_idx] = frame_y[src_idx];
padded_co[padded_idx] = frame_co[src_idx];
padded_cg[padded_idx] = frame_cg[src_idx];
}
} else {
// Fallback: process entire row pixel by pixel (for edge tiles)
for (int px = 0; px < TAV_PADDED_TILE_SIZE_X; px++) {
int src_x = core_start_x + px - TAV_TILE_MARGIN;
// Handle horizontal boundary conditions with mirroring
if (src_x < 0) {
src_x = -src_x;
} else if (src_x >= frame_width) {
src_x = frame_width - 1 - (src_x - frame_width);
}
src_x = CLAMP(src_x, 0, frame_width - 1);
int src_idx = src_row_offset + src_x;
int padded_idx = padded_row_offset + px;
padded_y[padded_idx] = frame_y[src_idx];
padded_co[padded_idx] = frame_co[src_idx];
padded_cg[padded_idx] = frame_cg[src_idx];
}
}
}
}
// Use existing 2D DWT from tav_encoder_dwt.c
// For padded tiles, we simply call the existing function with tile dimensions
void tav_dwt_2d_forward_padded_tile(float *tile_data, int levels, int filter_type) {
// Use the existing 2D DWT with padded tile dimensions
tav_dwt_2d_forward(tile_data, TAV_PADDED_TILE_SIZE_X, TAV_PADDED_TILE_SIZE_Y,
levels, filter_type);
}
void tav_dwt_2d_inverse_padded_tile(float *tile_data, int levels, int filter_type) {
// Note: Inverse transform not yet implemented in library for arbitrary dimensions
// For now, this is a placeholder - decoder uses different code path
(void)tile_data;
(void)levels;
(void)filter_type;
}
void tav_crop_tile_margins(const float *padded_data, float *core_data) {
for (int y = 0; y < TAV_TILE_SIZE_Y; y++) {
const int padded_row = (y + TAV_TILE_MARGIN) * TAV_PADDED_TILE_SIZE_X + TAV_TILE_MARGIN;
const int core_row = y * TAV_TILE_SIZE_X;
memcpy(&core_data[core_row], &padded_data[padded_row], TAV_TILE_SIZE_X * sizeof(float));
}
}
void tav_crop_tile_margins_edge(const float *padded_data, float *core_data,
int actual_width, int actual_height) {
for (int y = 0; y < actual_height; y++) {
const int padded_row = (y + TAV_TILE_MARGIN) * TAV_PADDED_TILE_SIZE_X + TAV_TILE_MARGIN;
const int core_row = y * actual_width;
memcpy(&core_data[core_row], &padded_data[padded_row], actual_width * sizeof(float));
}
}
void tav_get_tile_dimensions(int frame_width, int frame_height,
int tile_x, int tile_y,
int *tile_width, int *tile_height) {
// Calculate the starting position of this tile
int start_x = tile_x * TAV_TILE_SIZE_X;
int start_y = tile_y * TAV_TILE_SIZE_Y;
// Calculate how much of the frame is left from this starting position
int remaining_width = frame_width - start_x;
int remaining_height = frame_height - start_y;
// Tile width is the minimum of standard tile size and remaining width
*tile_width = (remaining_width < TAV_TILE_SIZE_X) ? remaining_width : TAV_TILE_SIZE_X;
*tile_height = (remaining_height < TAV_TILE_SIZE_Y) ? remaining_height : TAV_TILE_SIZE_Y;
}

View File

@@ -0,0 +1,103 @@
/**
* TAV Encoder Library - Tile Processing
*
* Functions for padded tile extraction and DWT processing.
* Used when video dimensions exceed monoblock threshold (720x576).
*/
#ifndef TAV_ENCODER_TILE_H
#define TAV_ENCODER_TILE_H
#include <stdint.h>
#include <stddef.h>
#include "../../include/tav_encoder_lib.h"
// Tile dimensions (from header)
// TAV_TILE_SIZE_X = 640, TAV_TILE_SIZE_Y = 540
// TAV_PADDED_TILE_SIZE_X = 704, TAV_PADDED_TILE_SIZE_Y = 604
// TAV_TILE_MARGIN = 32
/**
* Extract a padded tile from full-frame YCoCg buffers.
*
* Extracts a tile at position (tile_x, tile_y) with TAV_TILE_MARGIN pixels
* of padding on all sides for seamless DWT processing. Uses symmetric
* extension (mirroring) at frame boundaries.
*
* @param frame_y Full frame Y channel
* @param frame_co Full frame Co channel
* @param frame_cg Full frame Cg channel
* @param frame_width Full frame width
* @param frame_height Full frame height
* @param tile_x Tile X index (0-based)
* @param tile_y Tile Y index (0-based)
* @param padded_y Output: Padded tile Y (PADDED_TILE_SIZE_X * PADDED_TILE_SIZE_Y floats)
* @param padded_co Output: Padded tile Co
* @param padded_cg Output: Padded tile Cg
*/
void tav_extract_padded_tile(const float *frame_y, const float *frame_co, const float *frame_cg,
int frame_width, int frame_height,
int tile_x, int tile_y,
float *padded_y, float *padded_co, float *padded_cg);
/**
* Apply 2D DWT forward transform to a padded tile.
*
* Uses fixed PADDED_TILE_SIZE dimensions (704x604) for optimal performance.
*
* @param tile_data Tile data (modified in-place)
* @param levels Number of decomposition levels
* @param filter_type Wavelet filter type (0=CDF 5/3, 1=CDF 9/7, etc.)
*/
void tav_dwt_2d_forward_padded_tile(float *tile_data, int levels, int filter_type);
/**
* Apply 2D DWT inverse transform to a padded tile.
*
* @param tile_data Tile data (modified in-place)
* @param levels Number of decomposition levels
* @param filter_type Wavelet filter type
*/
void tav_dwt_2d_inverse_padded_tile(float *tile_data, int levels, int filter_type);
/**
* Crop a padded tile to its core region (removing margins).
*
* Extracts the central TAV_TILE_SIZE_X × TAV_TILE_SIZE_Y region from a padded tile.
*
* @param padded_data Padded tile (PADDED_TILE_SIZE_X * PADDED_TILE_SIZE_Y)
* @param core_data Output: Core tile (TILE_SIZE_X * TILE_SIZE_Y)
*/
void tav_crop_tile_margins(const float *padded_data, float *core_data);
/**
* Crop a padded tile to actual dimensions for edge tiles.
*
* For tiles at the right/bottom edges of a frame, the actual tile may be
* smaller than TILE_SIZE_X × TILE_SIZE_Y. This function handles that case.
*
* @param padded_data Padded tile (PADDED_TILE_SIZE_X * PADDED_TILE_SIZE_Y)
* @param core_data Output: Core tile data
* @param actual_width Actual tile width (may be < TILE_SIZE_X for edge tiles)
* @param actual_height Actual tile height (may be < TILE_SIZE_Y for edge tiles)
*/
void tav_crop_tile_margins_edge(const float *padded_data, float *core_data,
int actual_width, int actual_height);
/**
* Calculate actual tile dimensions for a given tile position.
*
* Edge tiles may be smaller than the standard tile size.
*
* @param frame_width Full frame width
* @param frame_height Full frame height
* @param tile_x Tile X index
* @param tile_y Tile Y index
* @param tile_width Output: Actual tile width
* @param tile_height Output: Actual tile height
*/
void tav_get_tile_dimensions(int frame_width, int frame_height,
int tile_x, int tile_y,
int *tile_width, int *tile_height);
#endif // TAV_ENCODER_TILE_H

View File

@@ -24,6 +24,8 @@
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include <limits.h>
#include "tav_video_decoder.h"
#include "decoder_tad.h"
@@ -53,6 +55,31 @@
#define TAV_PACKET_SYNC_NTSC 0xFE
#define TAV_PACKET_SYNC 0xFF
// Threading constants
#define MAX_DECODE_THREADS 16
#define DECODE_SLOT_PENDING 0
#define DECODE_SLOT_PROCESSING 1
#define DECODE_SLOT_DONE 2
// =============================================================================
// GOP Decode Job Structure (for multithreading)
// =============================================================================
typedef struct {
int job_id;
volatile int status; // DECODE_SLOT_*
// Input (compressed data read from file)
uint8_t *compressed_data;
uint32_t compressed_size;
int gop_size;
// Output (decoded frames)
uint8_t **frames;
int frames_allocated;
int decode_result;
} gop_decode_job_t;
// =============================================================================
// TAV Header Structure (32 bytes)
@@ -122,6 +149,21 @@ typedef struct {
int no_audio; // Skip audio decoding
int dump_packets; // Debug: dump packet info
// Threading support
int num_threads;
int num_slots;
gop_decode_job_t *slots;
tav_video_context_t **worker_video_ctx; // Per-thread decoder contexts
pthread_t *worker_threads;
pthread_mutex_t mutex;
pthread_cond_t cond_job_available;
pthread_cond_t cond_slot_free;
volatile int threads_should_exit;
volatile int next_write_slot; // Next slot to write to FFmpeg
volatile int next_read_slot; // Next slot for reading from file
volatile int jobs_submitted;
volatile int jobs_completed;
} decoder_context_t;
// =============================================================================
@@ -294,6 +336,231 @@ static int spawn_ffmpeg(decoder_context_t *ctx) {
return 0;
}
// =============================================================================
// Multithreading Support
// =============================================================================
// Worker thread function - decodes GOPs in parallel
static void *decoder_worker_thread(void *arg) {
decoder_context_t *ctx = (decoder_context_t *)arg;
// Get thread index by finding our thread ID in the array
int thread_idx = -1;
pthread_t self = pthread_self();
for (int i = 0; i < ctx->num_threads; i++) {
if (pthread_equal(ctx->worker_threads[i], self)) {
thread_idx = i;
break;
}
}
if (thread_idx < 0) thread_idx = 0; // Fallback
tav_video_context_t *my_video_ctx = ctx->worker_video_ctx[thread_idx];
while (1) {
pthread_mutex_lock(&ctx->mutex);
// Find a pending slot to work on
int slot_idx = -1;
while (slot_idx < 0 && !ctx->threads_should_exit) {
for (int i = 0; i < ctx->num_slots; i++) {
if (ctx->slots[i].status == DECODE_SLOT_PENDING &&
ctx->slots[i].compressed_data != NULL) {
slot_idx = i;
ctx->slots[i].status = DECODE_SLOT_PROCESSING;
break;
}
}
if (slot_idx < 0 && !ctx->threads_should_exit) {
pthread_cond_wait(&ctx->cond_job_available, &ctx->mutex);
}
}
if (ctx->threads_should_exit && slot_idx < 0) {
pthread_mutex_unlock(&ctx->mutex);
break;
}
pthread_mutex_unlock(&ctx->mutex);
if (slot_idx < 0) continue;
gop_decode_job_t *job = &ctx->slots[slot_idx];
// Decode GOP using our thread's decoder context
job->decode_result = tav_video_decode_gop(
my_video_ctx,
job->compressed_data,
job->compressed_size,
job->gop_size,
job->frames
);
// Free compressed data after decoding
free(job->compressed_data);
job->compressed_data = NULL;
// Mark as done
pthread_mutex_lock(&ctx->mutex);
job->status = DECODE_SLOT_DONE;
ctx->jobs_completed++;
pthread_cond_broadcast(&ctx->cond_slot_free);
pthread_mutex_unlock(&ctx->mutex);
}
return NULL;
}
static int init_decoder_threads(decoder_context_t *ctx) {
if (ctx->num_threads <= 0) {
return 0; // Single-threaded mode
}
// Limit threads
if (ctx->num_threads > MAX_DECODE_THREADS) {
ctx->num_threads = MAX_DECODE_THREADS;
}
// Number of slots = threads + 2 for pipelining
ctx->num_slots = ctx->num_threads + 2;
// Allocate slots
ctx->slots = calloc(ctx->num_slots, sizeof(gop_decode_job_t));
if (!ctx->slots) {
fprintf(stderr, "Error: Failed to allocate decode slots\n");
return -1;
}
// Pre-allocate frame buffers for each slot (assuming max GOP size of 32)
size_t frame_size = ctx->header.width * ctx->header.height * 3;
int max_gop_size = 32;
for (int i = 0; i < ctx->num_slots; i++) {
ctx->slots[i].job_id = -1;
ctx->slots[i].status = DECODE_SLOT_DONE; // Available
ctx->slots[i].frames = malloc(max_gop_size * sizeof(uint8_t*));
if (!ctx->slots[i].frames) {
fprintf(stderr, "Error: Failed to allocate frame pointers for slot %d\n", i);
return -1;
}
for (int j = 0; j < max_gop_size; j++) {
ctx->slots[i].frames[j] = malloc(frame_size);
if (!ctx->slots[i].frames[j]) {
fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d frame %d\n", i, j);
return -1;
}
}
ctx->slots[i].frames_allocated = max_gop_size;
}
// Create per-thread video decoder contexts
ctx->worker_video_ctx = malloc(ctx->num_threads * sizeof(tav_video_context_t*));
if (!ctx->worker_video_ctx) {
fprintf(stderr, "Error: Failed to allocate worker video contexts\n");
return -1;
}
tav_video_params_t video_params = {
.width = ctx->header.width,
.height = ctx->header.height,
.decomp_levels = ctx->header.decomp_levels,
.temporal_levels = 2,
.wavelet_filter = ctx->header.wavelet_filter,
.temporal_wavelet = 0,
.entropy_coder = ctx->header.entropy_coder,
.channel_layout = ctx->header.channel_layout,
.perceptual_tuning = ctx->perceptual_mode,
.quantiser_y = ctx->header.quantiser_y,
.quantiser_co = ctx->header.quantiser_co,
.quantiser_cg = ctx->header.quantiser_cg,
.encoder_preset = ctx->header.encoder_preset,
.monoblock = 1
};
for (int i = 0; i < ctx->num_threads; i++) {
ctx->worker_video_ctx[i] = tav_video_create(&video_params);
if (!ctx->worker_video_ctx[i]) {
fprintf(stderr, "Error: Failed to create video context for thread %d\n", i);
return -1;
}
}
// Initialize synchronization primitives
pthread_mutex_init(&ctx->mutex, NULL);
pthread_cond_init(&ctx->cond_job_available, NULL);
pthread_cond_init(&ctx->cond_slot_free, NULL);
ctx->threads_should_exit = 0;
ctx->next_write_slot = 0;
ctx->next_read_slot = 0;
ctx->jobs_submitted = 0;
ctx->jobs_completed = 0;
// Create worker threads
ctx->worker_threads = malloc(ctx->num_threads * sizeof(pthread_t));
if (!ctx->worker_threads) {
fprintf(stderr, "Error: Failed to allocate worker threads\n");
return -1;
}
for (int i = 0; i < ctx->num_threads; i++) {
if (pthread_create(&ctx->worker_threads[i], NULL, decoder_worker_thread, ctx) != 0) {
fprintf(stderr, "Error: Failed to create worker thread %d\n", i);
return -1;
}
}
if (ctx->verbose) {
printf("Initialized %d decoder worker threads with %d slots\n",
ctx->num_threads, ctx->num_slots);
}
return 0;
}
static void cleanup_decoder_threads(decoder_context_t *ctx) {
if (ctx->num_threads <= 0) return;
// Signal threads to exit
pthread_mutex_lock(&ctx->mutex);
ctx->threads_should_exit = 1;
pthread_cond_broadcast(&ctx->cond_job_available);
pthread_mutex_unlock(&ctx->mutex);
// Wait for threads to finish
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(ctx->worker_threads[i], NULL);
}
free(ctx->worker_threads);
ctx->worker_threads = NULL;
// Free per-thread video contexts
for (int i = 0; i < ctx->num_threads; i++) {
tav_video_free(ctx->worker_video_ctx[i]);
}
free(ctx->worker_video_ctx);
ctx->worker_video_ctx = NULL;
// Free slots
for (int i = 0; i < ctx->num_slots; i++) {
if (ctx->slots[i].frames) {
for (int j = 0; j < ctx->slots[i].frames_allocated; j++) {
free(ctx->slots[i].frames[j]);
}
free(ctx->slots[i].frames);
}
if (ctx->slots[i].compressed_data) {
free(ctx->slots[i].compressed_data);
}
}
free(ctx->slots);
ctx->slots = NULL;
// Destroy sync primitives
pthread_mutex_destroy(&ctx->mutex);
pthread_cond_destroy(&ctx->cond_job_available);
pthread_cond_destroy(&ctx->cond_slot_free);
}
// =============================================================================
// Frame Buffer Management
// =============================================================================
@@ -710,6 +977,301 @@ static int process_packet(decoder_context_t *ctx) {
}
}
// =============================================================================
// Multithreaded Video Decoding (Pass 2)
// =============================================================================
// Read a single GOP packet without decoding - for multithreaded submission
static int read_gop_packet_mt(decoder_context_t *ctx, int slot_idx) {
gop_decode_job_t *job = &ctx->slots[slot_idx];
// Read GOP size (1 byte)
uint8_t gop_size;
if (fread(&gop_size, 1, 1, ctx->input_fp) != 1) {
return -1;
}
ctx->bytes_read++;
// Read compressed size (4 bytes)
uint32_t compressed_size;
if (fread(&compressed_size, 4, 1, ctx->input_fp) != 1) {
return -1;
}
ctx->bytes_read += 4;
// Read compressed data
uint8_t *compressed_data = malloc(compressed_size);
if (!compressed_data) {
fprintf(stderr, "Error: Failed to allocate compressed data buffer\n");
return -1;
}
if (fread(compressed_data, 1, compressed_size, ctx->input_fp) != compressed_size) {
free(compressed_data);
return -1;
}
ctx->bytes_read += compressed_size;
// Fill job
job->compressed_data = compressed_data;
job->compressed_size = compressed_size;
job->gop_size = gop_size;
job->decode_result = 0;
return gop_size;
}
// Multithreaded pass 2 decoding loop
static int decode_video_pass2_mt(decoder_context_t *ctx) {
size_t frame_size = ctx->header.width * ctx->header.height * 3;
int done = 0;
int job_counter = 0;
while (!done) {
// Try to submit new jobs to any free slots
pthread_mutex_lock(&ctx->mutex);
// Find a free slot
int free_slot = -1;
for (int i = 0; i < ctx->num_slots; i++) {
if (ctx->slots[i].status == DECODE_SLOT_DONE &&
ctx->slots[i].compressed_data == NULL) {
free_slot = i;
break;
}
}
pthread_mutex_unlock(&ctx->mutex);
if (free_slot >= 0) {
// Read next packet
uint8_t packet_type;
if (fread(&packet_type, 1, 1, ctx->input_fp) != 1) {
// EOF
done = 1;
} else {
ctx->bytes_read++;
if (packet_type == TAV_PACKET_GOP_UNIFIED) {
// Read GOP and submit to slot
int gop_size = read_gop_packet_mt(ctx, free_slot);
if (gop_size > 0) {
pthread_mutex_lock(&ctx->mutex);
ctx->slots[free_slot].job_id = job_counter++;
ctx->slots[free_slot].status = DECODE_SLOT_PENDING;
ctx->jobs_submitted++;
pthread_cond_broadcast(&ctx->cond_job_available);
pthread_mutex_unlock(&ctx->mutex);
} else {
done = 1;
}
} else if (packet_type == TAV_PACKET_IFRAME) {
// For I-frames, decode synchronously (they're rare)
process_iframe_packet(ctx);
} else {
// Skip other packets (audio already extracted in Pass 1)
switch (packet_type) {
case TAV_PACKET_AUDIO_TAD: {
// TAD format: [sample_count(2)][payload_size+7(4)][data...]
uint16_t sample_count;
uint32_t payload_size;
if (fread(&sample_count, 2, 1, ctx->input_fp) != 1) { done = 1; break; }
if (fread(&payload_size, 4, 1, ctx->input_fp) != 1) { done = 1; break; }
ctx->bytes_read += 6;
fseek(ctx->input_fp, payload_size, SEEK_CUR);
ctx->bytes_read += payload_size;
break;
}
case TAV_PACKET_AUDIO_PCM8:
case TAV_PACKET_AUDIO_MP2:
case TAV_PACKET_AUDIO_TRACK:
case TAV_PACKET_SUBTITLE:
case TAV_PACKET_SUBTITLE_TC:
case TAV_PACKET_PFRAME: {
uint32_t size;
if (fread(&size, 4, 1, ctx->input_fp) != 1) { done = 1; break; }
ctx->bytes_read += 4;
fseek(ctx->input_fp, size, SEEK_CUR);
ctx->bytes_read += size;
break;
}
case TAV_PACKET_SCREEN_MASK:
fseek(ctx->input_fp, 4, SEEK_CUR);
ctx->bytes_read += 4;
break;
case TAV_PACKET_GOP_SYNC:
fseek(ctx->input_fp, 1, SEEK_CUR);
ctx->bytes_read += 1;
break;
case TAV_PACKET_TIMECODE:
fseek(ctx->input_fp, 8, SEEK_CUR);
ctx->bytes_read += 8;
break;
case TAV_PACKET_EXTENDED_HDR: {
// Skip extended header
uint16_t num_pairs;
if (fread(&num_pairs, 2, 1, ctx->input_fp) != 1) { done = 1; break; }
ctx->bytes_read += 2;
for (int i = 0; i < num_pairs; i++) {
uint8_t kv_header[5];
if (fread(kv_header, 1, 5, ctx->input_fp) != 5) break;
ctx->bytes_read += 5;
uint8_t value_type = kv_header[4];
if (value_type == 0x04) {
fseek(ctx->input_fp, 8, SEEK_CUR);
ctx->bytes_read += 8;
} else if (value_type == 0x10) {
uint16_t length;
if (fread(&length, 2, 1, ctx->input_fp) != 1) break;
ctx->bytes_read += 2;
fseek(ctx->input_fp, length, SEEK_CUR);
ctx->bytes_read += length;
} else if (value_type <= 0x04) {
int sizes[] = {2, 3, 4, 6, 8};
fseek(ctx->input_fp, sizes[value_type], SEEK_CUR);
ctx->bytes_read += sizes[value_type];
}
}
break;
}
case TAV_PACKET_SYNC_NTSC:
case TAV_PACKET_SYNC:
// No payload
break;
default:
// Unknown packet, try to skip
{
uint32_t size;
if (fread(&size, 4, 1, ctx->input_fp) == 1 && size < 1000000) {
fseek(ctx->input_fp, size, SEEK_CUR);
ctx->bytes_read += 4 + size;
}
}
break;
}
}
}
}
// Write completed jobs in order
pthread_mutex_lock(&ctx->mutex);
while (1) {
// Find the next job to write (by job_id order)
int write_slot = -1;
int min_job_id = INT32_MAX;
for (int i = 0; i < ctx->num_slots; i++) {
if (ctx->slots[i].status == DECODE_SLOT_DONE &&
ctx->slots[i].job_id >= 0 &&
ctx->slots[i].job_id < min_job_id) {
// Check if this is the next expected job
if (ctx->slots[i].job_id == ctx->next_write_slot) {
write_slot = i;
break;
}
min_job_id = ctx->slots[i].job_id;
}
}
if (write_slot < 0) {
// No jobs ready in order, wait if there are pending jobs
if (!done && ctx->jobs_submitted > ctx->next_write_slot) {
// Wait for job to complete
pthread_cond_wait(&ctx->cond_slot_free, &ctx->mutex);
continue;
}
break;
}
pthread_mutex_unlock(&ctx->mutex);
// Write frames to FFmpeg
gop_decode_job_t *job = &ctx->slots[write_slot];
if (job->decode_result >= 0) {
for (int i = 0; i < job->gop_size; i++) {
if (ctx->video_pipe) {
fwrite(job->frames[i], 1, frame_size, ctx->video_pipe);
}
ctx->frames_decoded++;
if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) {
done = 1;
break;
}
}
ctx->gops_decoded++;
}
// Mark slot as free
pthread_mutex_lock(&ctx->mutex);
job->job_id = -1;
ctx->next_write_slot++;
pthread_mutex_unlock(&ctx->mutex);
// Progress
time_t elapsed = time(NULL) - ctx->start_time;
double fps = elapsed > 0 ? (double)ctx->frames_decoded / elapsed : 0.0;
printf("\rFrames: %lu | GOPs: %lu | %.1f fps",
ctx->frames_decoded, ctx->gops_decoded, fps);
fflush(stdout);
pthread_mutex_lock(&ctx->mutex);
}
pthread_mutex_unlock(&ctx->mutex);
// Check decode limit
if (ctx->decode_limit > 0 && ctx->frames_decoded >= (uint64_t)ctx->decode_limit) {
done = 1;
}
}
// Wait for remaining jobs to complete
pthread_mutex_lock(&ctx->mutex);
while (ctx->jobs_completed < ctx->jobs_submitted) {
pthread_cond_wait(&ctx->cond_slot_free, &ctx->mutex);
}
// Write any remaining completed jobs
while (1) {
int write_slot = -1;
for (int i = 0; i < ctx->num_slots; i++) {
if (ctx->slots[i].status == DECODE_SLOT_DONE &&
ctx->slots[i].job_id == ctx->next_write_slot) {
write_slot = i;
break;
}
}
if (write_slot < 0) break;
pthread_mutex_unlock(&ctx->mutex);
gop_decode_job_t *job = &ctx->slots[write_slot];
if (job->decode_result >= 0) {
for (int i = 0; i < job->gop_size; i++) {
if (ctx->video_pipe) {
fwrite(job->frames[i], 1, frame_size, ctx->video_pipe);
}
ctx->frames_decoded++;
}
ctx->gops_decoded++;
}
pthread_mutex_lock(&ctx->mutex);
job->job_id = -1;
ctx->next_write_slot++;
time_t elapsed = time(NULL) - ctx->start_time;
double fps = elapsed > 0 ? (double)ctx->frames_decoded / elapsed : 0.0;
printf("\rFrames: %lu | GOPs: %lu | %.1f fps",
ctx->frames_decoded, ctx->gops_decoded, fps);
fflush(stdout);
}
pthread_mutex_unlock(&ctx->mutex);
printf("\n");
return 0;
}
// =============================================================================
// Main Decoding Loop
// =============================================================================
@@ -755,7 +1317,23 @@ static int decode_video(decoder_context_t *ctx) {
return -1;
}
// Initialize decoder threads if multithreaded mode
if (ctx->num_threads > 0) {
if (init_decoder_threads(ctx) < 0) {
fprintf(stderr, "Error: Failed to initialize decoder threads\n");
return -1;
}
printf(" Using %d decoder threads\n", ctx->num_threads);
}
// Pass 2: Video decoding
if (ctx->num_threads > 0) {
// Multithreaded decode
int result = decode_video_pass2_mt(ctx);
cleanup_decoder_threads(ctx);
return result;
} else {
// Single-threaded decode
uint64_t last_reported = 0;
while (process_packet(ctx) == 0) {
// Progress reporting - show when frames were decoded
@@ -777,6 +1355,7 @@ static int decode_video(decoder_context_t *ctx) {
printf("\n");
return 0;
}
}
// =============================================================================
// Usage and Main
@@ -816,6 +1395,7 @@ static void print_usage(const char *program) {
printf(" --no-audio Skip audio decoding\n");
printf(" --decode-limit N Decode only first N frames\n");
printf(" --dump-packets Debug: print packet info\n");
printf(" -t, --threads N Number of decoder threads (0=single-threaded, default)\n");
printf(" -v, --verbose Verbose output\n");
printf(" --help Show this help\n");
printf("\nExamples:\n");
@@ -835,6 +1415,7 @@ int main(int argc, char *argv[]) {
{"input", required_argument, 0, 'i'},
{"output", required_argument, 0, 'o'},
{"verbose", no_argument, 0, 'v'},
{"threads", required_argument, 0, 't'},
{"raw", no_argument, 0, 1001},
{"no-audio", no_argument, 0, 1002},
{"decode-limit", required_argument, 0, 1003},
@@ -844,7 +1425,7 @@ int main(int argc, char *argv[]) {
};
int c, option_index = 0;
while ((c = getopt_long(argc, argv, "i:o:vh", long_options, &option_index)) != -1) {
while ((c = getopt_long(argc, argv, "i:o:t:vh", long_options, &option_index)) != -1) {
switch (c) {
case 'i':
ctx.input_file = strdup(optarg);
@@ -855,6 +1436,9 @@ int main(int argc, char *argv[]) {
case 'v':
ctx.verbose = 1;
break;
case 't':
ctx.num_threads = atoi(optarg);
break;
case 1001:
ctx.output_raw = 1;
break;