/* MtCoder.c -- Multi-thread Coder 2021-02-09 : Igor Pavlov : Public domain */ #include "Precomp.h" #include "MtCoder.h" #ifndef _7ZIP_ST static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize) { CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt); UInt64 inSize2 = 0; UInt64 outSize2 = 0; if (inSize != (UInt64)(Int64)-1) { inSize2 = inSize - thunk->inSize; thunk->inSize = inSize; } if (outSize != (UInt64)(Int64)-1) { outSize2 = outSize - thunk->outSize; thunk->outSize = outSize; } return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2); } void MtProgressThunk_CreateVTable(CMtProgressThunk *p) { p->vt.Progress = MtProgressThunk_Progress; } #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } static WRes ArEvent_OptCreate_And_Reset(CEvent *p) { if (Event_IsCreated(p)) return Event_Reset(p); return AutoResetEvent_CreateNotSignaled(p); } static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp); static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) { WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent); if (wres == 0) { t->stop = False; if (!Thread_WasCreated(&t->thread)) wres = Thread_Create(&t->thread, ThreadFunc, t); if (wres == 0) wres = Event_Set(&t->startEvent); } if (wres == 0) return SZ_OK; return MY_SRes_HRESULT_FROM_WRes(wres); } static void MtCoderThread_Destruct(CMtCoderThread *t) { if (Thread_WasCreated(&t->thread)) { t->stop = 1; Event_Set(&t->startEvent); Thread_Wait_Close(&t->thread); } Event_Close(&t->startEvent); if (t->inBuf) { ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf); t->inBuf = NULL; } } static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) { size_t size = *processedSize; *processedSize = 0; while (size != 0) { size_t cur = size; SRes res = ISeqInStream_Read(stream, data, &cur); *processedSize += cur; data += cur; size -= cur; RINOK(res); if (cur == 0) return SZ_OK; } return SZ_OK; } /* ThreadFunc2() returns: SZ_OK - in all normal cases (even for stream error or memory allocation error) SZ_ERROR_THREAD - in case of failure in system synch function */ static SRes ThreadFunc2(CMtCoderThread *t) { CMtCoder *mtc = t->mtCoder; for (;;) { unsigned bi; SRes res; SRes res2; BoolInt finished; unsigned bufIndex; size_t size; const Byte *inData; UInt64 readProcessed = 0; RINOK_THREAD(Event_Wait(&mtc->readEvent)) /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */ if (mtc->stopReading) { return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD; } res = MtProgress_GetError(&mtc->mtProgress); size = 0; inData = NULL; finished = True; if (res == SZ_OK) { size = mtc->blockSize; if (mtc->inStream) { if (!t->inBuf) { t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize); if (!t->inBuf) res = SZ_ERROR_MEM; } if (res == SZ_OK) { res = FullRead(mtc->inStream, t->inBuf, &size); readProcessed = mtc->readProcessed + size; mtc->readProcessed = readProcessed; } if (res != SZ_OK) { mtc->readRes = res; /* after reading error - we can stop encoding of previous blocks */ MtProgress_SetError(&mtc->mtProgress, res); } else finished = (size != mtc->blockSize); } else { size_t rem; readProcessed = mtc->readProcessed; rem = mtc->inDataSize - (size_t)readProcessed; if (size > rem) size = rem; inData = mtc->inData + (size_t)readProcessed; readProcessed += size; mtc->readProcessed = readProcessed; finished = (mtc->inDataSize == (size_t)readProcessed); } } /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */ res2 = SZ_OK; if (Semaphore_Wait(&mtc->blocksSemaphore) != 0) { res2 = SZ_ERROR_THREAD; if (res == SZ_OK) { res = res2; // MtProgress_SetError(&mtc->mtProgress, res); } } bi = mtc->blockIndex; if (++mtc->blockIndex >= mtc->numBlocksMax) mtc->blockIndex = 0; bufIndex = (unsigned)(int)-1; if (res == SZ_OK) res = MtProgress_GetError(&mtc->mtProgress); if (res != SZ_OK) finished = True; if (!finished) { if (mtc->numStartedThreads < mtc->numStartedThreadsLimit && mtc->expectedDataSize != readProcessed) { res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); if (res == SZ_OK) mtc->numStartedThreads++; else { MtProgress_SetError(&mtc->mtProgress, res); finished = True; } } } if (finished) mtc->stopReading = True; RINOK_THREAD(Event_Set(&mtc->readEvent)) if (res2 != SZ_OK) return res2; if (res == SZ_OK) { CriticalSection_Enter(&mtc->cs); bufIndex = mtc->freeBlockHead; mtc->freeBlockHead = mtc->freeBlockList[bufIndex]; CriticalSection_Leave(&mtc->cs); res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex, mtc->inStream ? t->inBuf : inData, size, finished); // MtProgress_Reinit(&mtc->mtProgress, t->index); if (res != SZ_OK) MtProgress_SetError(&mtc->mtProgress, res); } { CMtCoderBlock *block = &mtc->blocks[bi]; block->res = res; block->bufIndex = bufIndex; block->finished = finished; } #ifdef MTCODER__USE_WRITE_THREAD RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) #else { unsigned wi; { CriticalSection_Enter(&mtc->cs); wi = mtc->writeIndex; if (wi == bi) mtc->writeIndex = (unsigned)(int)-1; else mtc->ReadyBlocks[bi] = True; CriticalSection_Leave(&mtc->cs); } if (wi != bi) { if (res != SZ_OK || finished) return 0; continue; } if (mtc->writeRes != SZ_OK) res = mtc->writeRes; for (;;) { if (res == SZ_OK && bufIndex != (unsigned)(int)-1) { res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex); if (res != SZ_OK) { mtc->writeRes = res; MtProgress_SetError(&mtc->mtProgress, res); } } if (++wi >= mtc->numBlocksMax) wi = 0; { BoolInt isReady; CriticalSection_Enter(&mtc->cs); if (bufIndex != (unsigned)(int)-1) { mtc->freeBlockList[bufIndex] = mtc->freeBlockHead; mtc->freeBlockHead = bufIndex; } isReady = mtc->ReadyBlocks[wi]; if (isReady) mtc->ReadyBlocks[wi] = False; else mtc->writeIndex = wi; CriticalSection_Leave(&mtc->cs); RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore)) if (!isReady) break; } { CMtCoderBlock *block = &mtc->blocks[wi]; if (res == SZ_OK && block->res != SZ_OK) res = block->res; bufIndex = block->bufIndex; finished = block->finished; } } } #endif if (finished || res != SZ_OK) return 0; } } static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) { CMtCoderThread *t = (CMtCoderThread *)pp; for (;;) { if (Event_Wait(&t->startEvent) != 0) return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; if (t->stop) return 0; { SRes res = ThreadFunc2(t); CMtCoder *mtc = t->mtCoder; if (res != SZ_OK) { MtProgress_SetError(&mtc->mtProgress, res); } #ifndef MTCODER__USE_WRITE_THREAD { unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); if (numFinished == mtc->numStartedThreads) if (Event_Set(&mtc->finishedEvent) != 0) return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD; } #endif } } } void MtCoder_Construct(CMtCoder *p) { unsigned i; p->blockSize = 0; p->numThreadsMax = 0; p->expectedDataSize = (UInt64)(Int64)-1; p->inStream = NULL; p->inData = NULL; p->inDataSize = 0; p->progress = NULL; p->allocBig = NULL; p->mtCallback = NULL; p->mtCallbackObject = NULL; p->allocatedBufsSize = 0; Event_Construct(&p->readEvent); Semaphore_Construct(&p->blocksSemaphore); for (i = 0; i < MTCODER__THREADS_MAX; i++) { CMtCoderThread *t = &p->threads[i]; t->mtCoder = p; t->index = i; t->inBuf = NULL; t->stop = False; Event_Construct(&t->startEvent); Thread_Construct(&t->thread); } #ifdef MTCODER__USE_WRITE_THREAD for (i = 0; i < MTCODER__BLOCKS_MAX; i++) Event_Construct(&p->writeEvents[i]); #else Event_Construct(&p->finishedEvent); #endif CriticalSection_Init(&p->cs); CriticalSection_Init(&p->mtProgress.cs); } static void MtCoder_Free(CMtCoder *p) { unsigned i; /* p->stopReading = True; if (Event_IsCreated(&p->readEvent)) Event_Set(&p->readEvent); */ for (i = 0; i < MTCODER__THREADS_MAX; i++) MtCoderThread_Destruct(&p->threads[i]); Event_Close(&p->readEvent); Semaphore_Close(&p->blocksSemaphore); #ifdef MTCODER__USE_WRITE_THREAD for (i = 0; i < MTCODER__BLOCKS_MAX; i++) Event_Close(&p->writeEvents[i]); #else Event_Close(&p->finishedEvent); #endif } void MtCoder_Destruct(CMtCoder *p) { MtCoder_Free(p); CriticalSection_Delete(&p->cs); CriticalSection_Delete(&p->mtProgress.cs); } SRes MtCoder_Code(CMtCoder *p) { unsigned numThreads = p->numThreadsMax; unsigned numBlocksMax; unsigned i; SRes res = SZ_OK; if (numThreads > MTCODER__THREADS_MAX) numThreads = MTCODER__THREADS_MAX; numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads); if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; if (numBlocksMax > MTCODER__BLOCKS_MAX) numBlocksMax = MTCODER__BLOCKS_MAX; if (p->blockSize != p->allocatedBufsSize) { for (i = 0; i < MTCODER__THREADS_MAX; i++) { CMtCoderThread *t = &p->threads[i]; if (t->inBuf) { ISzAlloc_Free(p->allocBig, t->inBuf); t->inBuf = NULL; } } p->allocatedBufsSize = p->blockSize; } p->readRes = SZ_OK; MtProgress_Init(&p->mtProgress, p->progress); #ifdef MTCODER__USE_WRITE_THREAD for (i = 0; i < numBlocksMax; i++) { RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i])); } #else RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); #endif { RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent)); if (Semaphore_IsCreated(&p->blocksSemaphore)) { RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore)); } RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax)); } for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++) p->freeBlockList[i] = i + 1; p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1; p->freeBlockHead = 0; p->readProcessed = 0; p->blockIndex = 0; p->numBlocksMax = numBlocksMax; p->stopReading = False; #ifndef MTCODER__USE_WRITE_THREAD p->writeIndex = 0; p->writeRes = SZ_OK; for (i = 0; i < MTCODER__BLOCKS_MAX; i++) p->ReadyBlocks[i] = False; p->numFinishedThreads = 0; #endif p->numStartedThreadsLimit = numThreads; p->numStartedThreads = 0; // for (i = 0; i < numThreads; i++) { CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; RINOK(MtCoderThread_CreateAndStart(nextThread)); } RINOK_THREAD(Event_Set(&p->readEvent)) #ifdef MTCODER__USE_WRITE_THREAD { unsigned bi = 0; for (;; bi++) { if (bi >= numBlocksMax) bi = 0; RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) { const CMtCoderBlock *block = &p->blocks[bi]; unsigned bufIndex = block->bufIndex; BoolInt finished = block->finished; if (res == SZ_OK && block->res != SZ_OK) res = block->res; if (bufIndex != (unsigned)(int)-1) { if (res == SZ_OK) { res = p->mtCallback->Write(p->mtCallbackObject, bufIndex); if (res != SZ_OK) MtProgress_SetError(&p->mtProgress, res); } CriticalSection_Enter(&p->cs); { p->freeBlockList[bufIndex] = p->freeBlockHead; p->freeBlockHead = bufIndex; } CriticalSection_Leave(&p->cs); } RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore)) if (finished) break; } } } #else { WRes wres = Event_Wait(&p->finishedEvent); res = MY_SRes_HRESULT_FROM_WRes(wres); } #endif if (res == SZ_OK) res = p->readRes; if (res == SZ_OK) res = p->mtProgress.res; #ifndef MTCODER__USE_WRITE_THREAD if (res == SZ_OK) res = p->writeRes; #endif if (res != SZ_OK) MtCoder_Free(p); return res; } #endif