[BTRFS][UBTRFS][SHELLBTRFS] Upgrade to 1.7.2
[reactos.git] / drivers / filesystems / btrfs / calcthread.c
index 3784428..faeb9b3 100644 (file)
  * along with WinBtrfs.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include "btrfs_drv.h"
+#include "xxhash.h"
+#include "crc32c.h"
 
-#define SECTOR_BLOCK 16
+void calc_thread_main(device_extension* Vcb, calc_job* cj) {
+    while (true) {
+        KIRQL irql;
+        calc_job* cj2;
+        uint8_t* src;
+        void* dest;
+        bool last_one = false;
+
+        KeAcquireSpinLock(&Vcb->calcthreads.spinlock, &irql);
+
+        if (cj && cj->not_started == 0) {
+            KeReleaseSpinLock(&Vcb->calcthreads.spinlock, irql);
+            break;
+        }
+
+        if (cj)
+            cj2 = cj;
+        else {
+            if (IsListEmpty(&Vcb->calcthreads.job_list)) {
+                KeReleaseSpinLock(&Vcb->calcthreads.spinlock, irql);
+                break;
+            }
+
+            cj2 = CONTAINING_RECORD(Vcb->calcthreads.job_list.Flink, calc_job, list_entry);
+        }
+
+        src = cj2->in;
+        dest = cj2->out;
+
+        switch (cj2->type) {
+            case calc_thread_crc32c:
+            case calc_thread_xxhash:
+            case calc_thread_sha256:
+            case calc_thread_blake2:
+                cj2->in = (uint8_t*)cj2->in + Vcb->superblock.sector_size;
+                cj2->out = (uint8_t*)cj2->out + Vcb->csum_size;
+            break;
+
+            default:
+                break;
+        }
+
+        cj2->not_started--;
+
+        if (cj2->not_started == 0) {
+            RemoveEntryList(&cj2->list_entry);
+            last_one = true;
+        }
+
+        KeReleaseSpinLock(&Vcb->calcthreads.spinlock, irql);
+
+        switch (cj2->type) {
+            case calc_thread_crc32c:
+                *(uint32_t*)dest = ~calc_crc32c(0xffffffff, src, Vcb->superblock.sector_size);
+            break;
+
+            case calc_thread_xxhash:
+                *(uint64_t*)dest = XXH64(src, Vcb->superblock.sector_size, 0);
+            break;
+
+            case calc_thread_sha256:
+                calc_sha256(dest, src, Vcb->superblock.sector_size);
+            break;
+
+            case calc_thread_blake2:
+                blake2b(dest, BLAKE2_HASH_SIZE, src, Vcb->superblock.sector_size);
+            break;
+
+            case calc_thread_decomp_zlib:
+                cj2->Status = zlib_decompress(src, cj2->inlen, dest, cj2->outlen);
+
+                if (!NT_SUCCESS(cj2->Status))
+                    ERR("zlib_decompress returned %08lx\n", cj2->Status);
+            break;
+
+            case calc_thread_decomp_lzo:
+                cj2->Status = lzo_decompress(src, cj2->inlen, dest, cj2->outlen, cj2->off);
+
+                if (!NT_SUCCESS(cj2->Status))
+                    ERR("lzo_decompress returned %08lx\n", cj2->Status);
+            break;
+
+            case calc_thread_decomp_zstd:
+                cj2->Status = zstd_decompress(src, cj2->inlen, dest, cj2->outlen);
+
+                if (!NT_SUCCESS(cj2->Status))
+                    ERR("zstd_decompress returned %08lx\n", cj2->Status);
+            break;
+
+            case calc_thread_comp_zlib:
+                cj2->Status = zlib_compress(src, cj2->inlen, dest, cj2->outlen, Vcb->options.zlib_level, &cj2->space_left);
+
+                if (!NT_SUCCESS(cj2->Status))
+                    ERR("zlib_compress returned %08lx\n", cj2->Status);
+            break;
+
+            case calc_thread_comp_lzo:
+                cj2->Status = lzo_compress(src, cj2->inlen, dest, cj2->outlen, &cj2->space_left);
+
+                if (!NT_SUCCESS(cj2->Status))
+                    ERR("lzo_compress returned %08lx\n", cj2->Status);
+            break;
+
+            case calc_thread_comp_zstd:
+                cj2->Status = zstd_compress(src, cj2->inlen, dest, cj2->outlen, Vcb->options.zstd_level, &cj2->space_left);
+
+                if (!NT_SUCCESS(cj2->Status))
+                    ERR("zstd_compress returned %08lx\n", cj2->Status);
+            break;
+        }
+
+        if (InterlockedDecrement(&cj2->left) == 0)
+            KeSetEvent(&cj2->event, 0, false);
+
+        if (last_one)
+            break;
+    }
+}
+
+void do_calc_job(device_extension* Vcb, uint8_t* data, uint32_t sectors, void* csum) {
+    KIRQL irql;
+    calc_job cj;
+
+    cj.in = data;
+    cj.out = csum;
+    cj.left = cj.not_started = sectors;
+
+    switch (Vcb->superblock.csum_type) {
+        case CSUM_TYPE_CRC32C:
+            cj.type = calc_thread_crc32c;
+        break;
+
+        case CSUM_TYPE_XXHASH:
+            cj.type = calc_thread_xxhash;
+        break;
+
+        case CSUM_TYPE_SHA256:
+            cj.type = calc_thread_sha256;
+        break;
+
+        case CSUM_TYPE_BLAKE2:
+            cj.type = calc_thread_blake2;
+        break;
+    }
+
+    KeInitializeEvent(&cj.event, NotificationEvent, false);
+
+    KeAcquireSpinLock(&Vcb->calcthreads.spinlock, &irql);
+
+    InsertTailList(&Vcb->calcthreads.job_list, &cj.list_entry);
+
+    KeSetEvent(&Vcb->calcthreads.event, 0, false);
+    KeClearEvent(&Vcb->calcthreads.event);
+
+    KeReleaseSpinLock(&Vcb->calcthreads.spinlock, irql);
+
+    calc_thread_main(Vcb, &cj);
 
-NTSTATUS add_calc_job(device_extension* Vcb, uint8_t* data, uint32_t sectors, uint32_t* csum, calc_job** pcj) {
+    KeWaitForSingleObject(&cj.event, Executive, KernelMode, false, NULL);
+}
+
+NTSTATUS add_calc_job_decomp(device_extension* Vcb, uint8_t compression, void* in, unsigned int inlen,
+                             void* out, unsigned int outlen, unsigned int off, calc_job** pcj) {
     calc_job* cj;
+    KIRQL irql;
 
     cj = ExAllocatePoolWithTag(NonPagedPool, sizeof(calc_job), ALLOC_TAG);
     if (!cj) {
@@ -28,67 +191,100 @@ NTSTATUS add_calc_job(device_extension* Vcb, uint8_t* data, uint32_t sectors, ui
         return STATUS_INSUFFICIENT_RESOURCES;
     }
 
-    cj->data = data;
-    cj->sectors = sectors;
-    cj->csum = csum;
-    cj->pos = 0;
-    cj->done = 0;
-    cj->refcount = 1;
+    cj->in = in;
+    cj->inlen = inlen;
+    cj->out = out;
+    cj->outlen = outlen;
+    cj->off = off;
+    cj->left = cj->not_started = 1;
+    cj->Status = STATUS_SUCCESS;
+
+    switch (compression) {
+        case BTRFS_COMPRESSION_ZLIB:
+            cj->type = calc_thread_decomp_zlib;
+        break;
+
+        case BTRFS_COMPRESSION_LZO:
+            cj->type = calc_thread_decomp_lzo;
+        break;
+
+        case BTRFS_COMPRESSION_ZSTD:
+            cj->type = calc_thread_decomp_zstd;
+        break;
+
+        default:
+            ERR("unexpected compression type %x\n", compression);
+            ExFreePool(cj);
+        return STATUS_NOT_SUPPORTED;
+    }
+
     KeInitializeEvent(&cj->event, NotificationEvent, false);
 
-    ExAcquireResourceExclusiveLite(&Vcb->calcthreads.lock, true);
+    KeAcquireSpinLock(&Vcb->calcthreads.spinlock, &irql);
 
     InsertTailList(&Vcb->calcthreads.job_list, &cj->list_entry);
 
     KeSetEvent(&Vcb->calcthreads.event, 0, false);
     KeClearEvent(&Vcb->calcthreads.event);
 
-    ExReleaseResourceLite(&Vcb->calcthreads.lock);
+    KeReleaseSpinLock(&Vcb->calcthreads.spinlock, irql);
 
     *pcj = cj;
 
     return STATUS_SUCCESS;
 }
 
-void free_calc_job(calc_job* cj) {
-    LONG rc = InterlockedDecrement(&cj->refcount);
-
-    if (rc == 0)
-        ExFreePool(cj);
-}
+NTSTATUS add_calc_job_comp(device_extension* Vcb, uint8_t compression, void* in, unsigned int inlen,
+                           void* out, unsigned int outlen, calc_job** pcj) {
+    calc_job* cj;
+    KIRQL irql;
 
-static bool do_calc(device_extension* Vcb, calc_job* cj) {
-    LONG pos, done;
-    uint32_t* csum;
-    uint8_t* data;
-    ULONG blocksize, i;
+    cj = ExAllocatePoolWithTag(NonPagedPool, sizeof(calc_job), ALLOC_TAG);
+    if (!cj) {
+        ERR("out of memory\n");
+        return STATUS_INSUFFICIENT_RESOURCES;
+    }
 
-    pos = InterlockedIncrement(&cj->pos) - 1;
+    cj->in = in;
+    cj->inlen = inlen;
+    cj->out = out;
+    cj->outlen = outlen;
+    cj->left = cj->not_started = 1;
+    cj->Status = STATUS_SUCCESS;
+
+    switch (compression) {
+        case BTRFS_COMPRESSION_ZLIB:
+            cj->type = calc_thread_comp_zlib;
+        break;
+
+        case BTRFS_COMPRESSION_LZO:
+            cj->type = calc_thread_comp_lzo;
+        break;
+
+        case BTRFS_COMPRESSION_ZSTD:
+            cj->type = calc_thread_comp_zstd;
+        break;
+
+        default:
+            ERR("unexpected compression type %x\n", compression);
+            ExFreePool(cj);
+        return STATUS_NOT_SUPPORTED;
+    }
 
-    if ((uint32_t)pos * SECTOR_BLOCK >= cj->sectors)
-        return false;
+    KeInitializeEvent(&cj->event, NotificationEvent, false);
 
-    csum = &cj->csum[pos * SECTOR_BLOCK];
-    data = cj->data + (pos * SECTOR_BLOCK * Vcb->superblock.sector_size);
+    KeAcquireSpinLock(&Vcb->calcthreads.spinlock, &irql);
 
-    blocksize = min(SECTOR_BLOCK, cj->sectors - (pos * SECTOR_BLOCK));
-    for (i = 0; i < blocksize; i++) {
-        *csum = ~calc_crc32c(0xffffffff, data, Vcb->superblock.sector_size);
-        csum++;
-        data += Vcb->superblock.sector_size;
-    }
+    InsertTailList(&Vcb->calcthreads.job_list, &cj->list_entry);
 
-    done = InterlockedIncrement(&cj->done);
+    KeSetEvent(&Vcb->calcthreads.event, 0, false);
+    KeClearEvent(&Vcb->calcthreads.event);
 
-    if ((uint32_t)done * SECTOR_BLOCK >= cj->sectors) {
-        ExAcquireResourceExclusiveLite(&Vcb->calcthreads.lock, true);
-        RemoveEntryList(&cj->list_entry);
-        ExReleaseResourceLite(&Vcb->calcthreads.lock);
+    KeReleaseSpinLock(&Vcb->calcthreads.spinlock, irql);
 
-        KeSetEvent(&cj->event, 0, false);
-    }
+    *pcj = cj;
 
-    return true;
+    return STATUS_SUCCESS;
 }
 
 _Function_class_(KSTART_ROUTINE)
@@ -98,32 +294,12 @@ void __stdcall calc_thread(void* context) {
 
     ObReferenceObject(thread->DeviceObject);
 
+    KeSetSystemAffinityThread((KAFFINITY)(1 << thread->number));
+
     while (true) {
         KeWaitForSingleObject(&Vcb->calcthreads.event, Executive, KernelMode, false, NULL);
 
-        while (true) {
-            calc_job* cj;
-            bool b;
-
-            ExAcquireResourceExclusiveLite(&Vcb->calcthreads.lock, true);
-
-            if (IsListEmpty(&Vcb->calcthreads.job_list)) {
-                ExReleaseResourceLite(&Vcb->calcthreads.lock);
-                break;
-            }
-
-            cj = CONTAINING_RECORD(Vcb->calcthreads.job_list.Flink, calc_job, list_entry);
-            cj->refcount++;
-
-            ExReleaseResourceLite(&Vcb->calcthreads.lock);
-
-            b = do_calc(Vcb, cj);
-
-            free_calc_job(cj);
-
-            if (!b)
-                break;
-        }
+        calc_thread_main(Vcb, NULL);
 
         if (thread->quit)
             break;