diff --git a/core/trino-main/pom.xml b/core/trino-main/pom.xml
index d3c4aecab468..39b0403ba94f 100644
--- a/core/trino-main/pom.xml
+++ b/core/trino-main/pom.xml
@@ -556,4 +556,29 @@
             test
         
     
+
+    
+        
+            
+                
+                    org.apache.maven.plugins
+                    maven-compiler-plugin
+                    
+                        
+                        
+                            ${extraJavaVectorArgs}
+                        
+                    
+                
+                
+                    org.apache.maven.plugins
+                    maven-javadoc-plugin
+                    
+                        
+                        ${extraJavaVectorArgs}
+                    
+                
+            
+        
+    
 
diff --git a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java
index 219c42b7db9b..ba4eea8de0d5 100644
--- a/core/trino-main/src/main/java/io/trino/FeaturesConfig.java
+++ b/core/trino-main/src/main/java/io/trino/FeaturesConfig.java
@@ -94,6 +94,7 @@ public class FeaturesConfig
      * default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
      */
     private CompressionCodec exchangeCompressionCodec = NONE;
+    private BlockSerdeVectorizedNullSuppressionStrategy blockSerdeVectorizedNullSuppressionStrategy = BlockSerdeVectorizedNullSuppressionStrategy.AUTO;
     private boolean pagesIndexEagerCompactionEnabled;
     private boolean omitDateTimeTypePrecision;
     private int maxRecursionDepth = 10;
@@ -133,6 +134,12 @@ public enum DataIntegrityVerification
         /**/;
     }
 
+    public enum BlockSerdeVectorizedNullSuppressionStrategy
+    {
+        AUTO,
+        NONE,
+    }
+
     public boolean isOmitDateTimeTypePrecision()
     {
         return omitDateTimeTypePrecision;
@@ -366,6 +373,19 @@ public FeaturesConfig setExchangeCompressionCodec(CompressionCodec exchangeCompr
         return this;
     }
 
+    @Config("experimental.blockserde-vectorized-null-suppression-strategy")
+    @ConfigDescription("Strategy used for vectorized null suppression in block serde")
+    public FeaturesConfig setBlockSerdeVectorizedNullSuppressionStrategy(BlockSerdeVectorizedNullSuppressionStrategy blockSerdeVectorizedNullSuppressionStrategy)
+    {
+        this.blockSerdeVectorizedNullSuppressionStrategy = blockSerdeVectorizedNullSuppressionStrategy;
+        return this;
+    }
+
+    public BlockSerdeVectorizedNullSuppressionStrategy getBlockSerdeVectorizedNullSuppressionStrategy()
+    {
+        return blockSerdeVectorizedNullSuppressionStrategy;
+    }
+
     public DataIntegrityVerification getExchangeDataIntegrityVerification()
     {
         return exchangeDataIntegrityVerification;
diff --git a/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java b/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java
index 54ab5fabf85c..8c79242397ae 100644
--- a/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java
+++ b/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java
@@ -13,6 +13,9 @@
  */
 package io.trino.metadata;
 
+import com.google.inject.Inject;
+import io.trino.simd.BlockEncodingSimdSupport;
+import io.trino.simd.BlockEncodingSimdSupport.SimdSupport;
 import io.trino.spi.block.ArrayBlockEncoding;
 import io.trino.spi.block.Block;
 import io.trino.spi.block.BlockEncoding;
@@ -32,6 +35,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.simd.BlockEncodingSimdSupport.TESTING_BLOCK_ENCODING_SIMD_SUPPORT;
 import static java.util.Objects.requireNonNull;
 
 public final class BlockEncodingManager
@@ -41,14 +45,19 @@ public final class BlockEncodingManager
     // for serialization
     private final Map, BlockEncoding> blockEncodingNamesByClass = new ConcurrentHashMap<>();
 
-    public BlockEncodingManager()
+    public static final BlockEncodingManager TESTING_BLOCK_ENCODING_MANAGER = new BlockEncodingManager(TESTING_BLOCK_ENCODING_SIMD_SUPPORT);
+
+    @Inject
+    public BlockEncodingManager(
+            BlockEncodingSimdSupport blockEncodingSimdSupport)
     {
         // add the built-in BlockEncodings
+        SimdSupport simdSupport = blockEncodingSimdSupport.getSimdSupport();
         addBlockEncoding(new VariableWidthBlockEncoding());
-        addBlockEncoding(new ByteArrayBlockEncoding());
-        addBlockEncoding(new ShortArrayBlockEncoding());
-        addBlockEncoding(new IntArrayBlockEncoding());
-        addBlockEncoding(new LongArrayBlockEncoding());
+        addBlockEncoding(new ByteArrayBlockEncoding(simdSupport.expandAndCompressByte()));
+        addBlockEncoding(new ShortArrayBlockEncoding(simdSupport.expandAndCompressShort()));
+        addBlockEncoding(new IntArrayBlockEncoding(simdSupport.expandAndCompressInt()));
+        addBlockEncoding(new LongArrayBlockEncoding(simdSupport.expandAndCompressLong()));
         addBlockEncoding(new Fixed12BlockEncoding());
         addBlockEncoding(new Int128ArrayBlockEncoding());
         addBlockEncoding(new DictionaryBlockEncoding());
diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java
index 1fa86e21fe93..a997796094b6 100644
--- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java
+++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java
@@ -100,6 +100,7 @@
 import io.trino.server.protocol.PreparedStatementEncoder;
 import io.trino.server.protocol.spooling.SpoolingServerModule;
 import io.trino.server.remotetask.HttpLocationFactory;
+import io.trino.simd.BlockEncodingSimdSupport;
 import io.trino.spi.PageIndexerFactory;
 import io.trino.spi.PageSorter;
 import io.trino.spi.VersionEmbedder;
@@ -431,6 +432,9 @@ protected void setup(Binder binder)
                 .to(ServerPluginsProvider.class).in(Scopes.SINGLETON);
         configBinder(binder).bindConfig(ServerPluginsProviderConfig.class);
 
+        // SIMD support
+        binder.bind(BlockEncodingSimdSupport.class).in(Scopes.SINGLETON);
+
         // block encodings
         binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
         jsonBinder(binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
diff --git a/core/trino-main/src/main/java/io/trino/simd/BlockEncodingSimdSupport.java b/core/trino-main/src/main/java/io/trino/simd/BlockEncodingSimdSupport.java
new file mode 100644
index 000000000000..c83ce1e51530
--- /dev/null
+++ b/core/trino-main/src/main/java/io/trino/simd/BlockEncodingSimdSupport.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.simd;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.trino.FeaturesConfig;
+import io.trino.util.MachineInfo;
+import jdk.incubator.vector.ByteVector;
+import jdk.incubator.vector.IntVector;
+import jdk.incubator.vector.LongVector;
+import jdk.incubator.vector.ShortVector;
+import oshi.hardware.CentralProcessor.ProcessorIdentifier;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+import static io.trino.FeaturesConfig.BlockSerdeVectorizedNullSuppressionStrategy.AUTO;
+import static io.trino.util.MachineInfo.readCpuFlags;
+import static java.util.Locale.ENGLISH;
+
+/*
+We need to specifically detect AVX512F (for VPCOMPRESSD / VPCOMPRESSQ instruction support for int and long types) and
+AVX512VBMI2 (for VPCOMPRESSB / VPCOMPRESSW instruction support over byte and short types). Because we would like to check
+whether Vector#compress(VectorMask) is supported natively in hardware or emulated by the JVM - because the emulated
+support is so much slower than the simple scalar code that exists, but since we don't have the ability to detect that
+directly from the JDK vector API we have to assume that native support exists whenever the CPU advertises it.
+ */
+@Singleton
+public final class BlockEncodingSimdSupport
+{
+    public record SimdSupport(
+            boolean expandAndCompressByte,
+            boolean expandAndCompressShort,
+            boolean expandAndCompressInt,
+            boolean expandAndCompressLong)
+    {
+        public static final SimdSupport NONE = new SimdSupport(false, false, false, false);
+        public static final SimdSupport ALL = new SimdSupport(true, true, true, true);
+    }
+
+    public static final int MINIMUM_SIMD_LENGTH = 512;
+    private final SimdSupport simdSupport;
+    private static final SimdSupport AUTO_DETECTED_SUPPORT = detectSimd();
+
+    public static final BlockEncodingSimdSupport TESTING_BLOCK_ENCODING_SIMD_SUPPORT = new BlockEncodingSimdSupport(true);
+
+    @Inject
+    public BlockEncodingSimdSupport(
+            FeaturesConfig featuresConfig)
+    {
+        this(featuresConfig.getBlockSerdeVectorizedNullSuppressionStrategy().equals(AUTO));
+    }
+
+    public BlockEncodingSimdSupport(
+            boolean enableAutoDetectedSimdSupport)
+    {
+        if (enableAutoDetectedSimdSupport) {
+            simdSupport = AUTO_DETECTED_SUPPORT;
+        }
+        else {
+            simdSupport = SimdSupport.NONE;
+        }
+    }
+
+    private static SimdSupport detectSimd()
+    {
+        ProcessorIdentifier id = MachineInfo.getProcessorInfo();
+
+        String vendor = id.getVendor().toLowerCase(ENGLISH);
+
+        if (vendor.contains("intel") || vendor.contains("amd")) {
+            return detectX86SimdSupport();
+        }
+
+        return SimdSupport.NONE;
+    }
+
+    private static SimdSupport detectX86SimdSupport()
+    {
+        enum X86SimdInstructionSet {
+            avx512f,
+            avx512vbmi2
+        }
+
+        Set flags = readCpuFlags();
+        EnumSet x86Flags = EnumSet.noneOf(X86SimdInstructionSet.class);
+
+        if (!flags.isEmpty()) {
+            for (X86SimdInstructionSet instructionSet : X86SimdInstructionSet.values()) {
+                if (flags.contains(instructionSet.name())) {
+                    x86Flags.add(instructionSet);
+                }
+            }
+        }
+
+        return new SimdSupport(
+                (ByteVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86SimdInstructionSet.avx512vbmi2),
+                (ShortVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86SimdInstructionSet.avx512vbmi2),
+                (IntVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86SimdInstructionSet.avx512f),
+                (LongVector.SPECIES_PREFERRED.vectorBitSize() >= MINIMUM_SIMD_LENGTH) && x86Flags.contains(X86SimdInstructionSet.avx512f));
+    }
+
+    public SimdSupport getSimdSupport()
+    {
+        return simdSupport;
+    }
+}
diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java
index e41574a6fa7c..60c1bb09a8cb 100644
--- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java
+++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java
@@ -80,7 +80,6 @@
 import io.trino.memory.MemoryManagerConfig;
 import io.trino.memory.NodeMemoryConfig;
 import io.trino.metadata.AnalyzePropertyManager;
-import io.trino.metadata.BlockEncodingManager;
 import io.trino.metadata.CatalogManager;
 import io.trino.metadata.ColumnPropertyManager;
 import io.trino.metadata.DisabledSystemSecurityMetadata;
@@ -252,6 +251,7 @@
 import static io.trino.execution.ParameterExtractor.bindParameters;
 import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
 import static io.trino.execution.warnings.WarningCollector.NOOP;
+import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
 import static io.trino.node.TestingInternalNodeManager.CURRENT_NODE;
 import static io.trino.spi.connector.Constraint.alwaysTrue;
 import static io.trino.spi.connector.DynamicFilter.EMPTY;
@@ -358,10 +358,9 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
                 catalogManager,
                 notificationExecutor);
 
-        BlockEncodingManager blockEncodingManager = new BlockEncodingManager();
         TypeRegistry typeRegistry = new TypeRegistry(typeOperators, new FeaturesConfig());
         TypeManager typeManager = new InternalTypeManager(typeRegistry);
-        InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodingManager, typeManager);
+        InternalBlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
         SecretsResolver secretsResolver = new SecretsResolver(ImmutableMap.of());
 
         this.globalFunctionCatalog = new GlobalFunctionCatalog(
@@ -496,7 +495,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
                 new GroupProviderManager(secretsResolver),
                 new SessionPropertyDefaults(nodeInfo, accessControl, secretsResolver),
                 typeRegistry,
-                blockEncodingManager,
+                TESTING_BLOCK_ENCODING_MANAGER,
                 new HandleResolver(),
                 exchangeManagerRegistry,
                 spoolingManagerRegistry);
diff --git a/core/trino-main/src/main/java/io/trino/util/MachineInfo.java b/core/trino-main/src/main/java/io/trino/util/MachineInfo.java
index 9011b59f40dd..74d5e257721d 100644
--- a/core/trino-main/src/main/java/io/trino/util/MachineInfo.java
+++ b/core/trino-main/src/main/java/io/trino/util/MachineInfo.java
@@ -14,14 +14,25 @@
 package io.trino.util;
 
 import com.google.common.base.StandardSystemProperty;
+import com.google.common.collect.ImmutableSet;
 import oshi.SystemInfo;
+import oshi.hardware.CentralProcessor;
+import oshi.hardware.CentralProcessor.ProcessorIdentifier;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
 import static java.lang.Math.min;
+import static java.util.Locale.ENGLISH;
 
 public final class MachineInfo
 {
     // cache physical processor count, so that it's not queried multiple times during tests
     private static volatile int physicalProcessorCount = -1;
+    private static final SystemInfo SYSTEM_INFO = new SystemInfo();
 
     private MachineInfo() {}
 
@@ -38,7 +49,7 @@ public static int getAvailablePhysicalProcessorCount()
         if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
             // Oshi can recognize physical processor count (without hyper threading) for x86 platforms.
             // However, it doesn't correctly recognize physical processor count for ARM platforms.
-            totalPhysicalProcessorCount = new SystemInfo()
+            totalPhysicalProcessorCount = SYSTEM_INFO
                     .getHardware()
                     .getProcessor()
                     .getPhysicalProcessorCount();
@@ -52,4 +63,68 @@ public static int getAvailablePhysicalProcessorCount()
         physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
         return physicalProcessorCount;
     }
+
+    public static ProcessorIdentifier getProcessorInfo()
+    {
+        return SYSTEM_INFO.getHardware().getProcessor().getProcessorIdentifier();
+    }
+
+    public static Set readCpuFlags()
+    {
+        CentralProcessor cpu = SYSTEM_INFO.getHardware().getProcessor();
+        List flags = cpu.getFeatureFlags();
+        if (flags == null || flags.isEmpty()) {
+            return ImmutableSet.of();
+        }
+        // Each element of flags represents the hardware support for an individual core, so we're want to calculate flags
+        // advertised by all cores
+        Set intersection = null;
+
+        for (String line : flags) {
+            if (line == null || line.isBlank()) {
+                continue;
+            }
+
+            // Strip the "flags:" / "Features:" prefix if present.
+            String body = line;
+            int colon = line.indexOf(':');
+            if (colon >= 0) {
+                body = line.substring(colon + 1);
+            }
+
+            // Tokenize + normalize.
+            Set tokens = Arrays.stream(body.trim().split("\\s+"))
+                    .map(token -> normalizeFlag(token))
+                    .filter(token -> !token.isEmpty())
+                    .collect(toImmutableSet());
+
+            if (tokens.isEmpty()) {
+                continue;
+            }
+
+            if (intersection == null) {
+                intersection = new HashSet<>(tokens);
+            }
+            else {
+                intersection.retainAll(tokens);
+                if (intersection.isEmpty()) {
+                    break; // nothing in common
+                }
+            }
+        }
+
+        return intersection == null ? ImmutableSet.of() : intersection;
+    }
+
+    public static String normalizeFlag(String flag)
+    {
+        flag = flag.toLowerCase(ENGLISH).replace("_", "").trim();
+
+        // Skip stray keys that may sneak in if the colon wasn’t found.
+        if (flag.equals("flags") || flag.equals("features")) {
+            return "";
+        }
+
+        return flag;
+    }
 }
diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java
index 2d0dddb9adf5..5710ab4302ea 100644
--- a/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java
+++ b/core/trino-main/src/test/java/io/trino/execution/buffer/BenchmarkBlockSerde.java
@@ -81,7 +81,7 @@ public class BenchmarkBlockSerde
 {
     private static final DecimalType LONG_DECIMAL_TYPE = createDecimalType(30, 5);
 
-    public static final int ROWS = 10_000_000;
+    public static final int ROWS = 8192;
 
     @Benchmark
     public Object serializeLongDecimal(LongDecimalBenchmarkData data)
diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java
index c2633ffb92a0..0b64bf0b28af 100644
--- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java
+++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestPagesSerde.java
@@ -19,7 +19,6 @@
 import io.airlift.slice.SliceInput;
 import io.airlift.slice.SliceOutput;
 import io.airlift.slice.Slices;
-import io.trino.metadata.BlockEncodingManager;
 import io.trino.metadata.InternalBlockEncodingSerde;
 import io.trino.spi.Page;
 import io.trino.spi.PageBuilder;
@@ -47,6 +46,7 @@
 import static io.trino.execution.buffer.CompressionCodec.NONE;
 import static io.trino.execution.buffer.PagesSerdeUtil.readPages;
 import static io.trino.execution.buffer.PagesSerdeUtil.writePages;
+import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
 import static io.trino.operator.PageAssertions.assertPageEquals;
 import static io.trino.spi.type.BigintType.BIGINT;
 import static io.trino.spi.type.DoubleType.DOUBLE;
@@ -66,7 +66,7 @@ public class TestPagesSerde
     @BeforeAll
     public void setup()
     {
-        blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER);
+        blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, TESTING_TYPE_MANAGER);
     }
 
     @AfterAll
diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdes.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdes.java
index 74b336922ace..82ed7157119b 100644
--- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdes.java
+++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestingPagesSerdes.java
@@ -13,17 +13,17 @@
  */
 package io.trino.execution.buffer;
 
-import io.trino.metadata.BlockEncodingManager;
 import io.trino.metadata.InternalBlockEncodingSerde;
 
 import static io.trino.execution.buffer.CompressionCodec.NONE;
+import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
 import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
 
 public final class TestingPagesSerdes
 {
     private TestingPagesSerdes() {}
 
-    private static final InternalBlockEncodingSerde BLOCK_ENCODING_SERDE = new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER);
+    private static final InternalBlockEncodingSerde BLOCK_ENCODING_SERDE = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, TESTING_TYPE_MANAGER);
 
     public static PagesSerdeFactory createTestingPagesSerdeFactory()
     {
diff --git a/core/trino-main/src/test/java/io/trino/metadata/TestMetadataManager.java b/core/trino-main/src/test/java/io/trino/metadata/TestMetadataManager.java
index 5e1c2f09a304..eeb738cbd412 100644
--- a/core/trino-main/src/test/java/io/trino/metadata/TestMetadataManager.java
+++ b/core/trino-main/src/test/java/io/trino/metadata/TestMetadataManager.java
@@ -28,6 +28,7 @@
 import java.util.Set;
 
 import static io.trino.client.NodeVersion.UNKNOWN;
+import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
 import static io.trino.metadata.CatalogManager.NO_CATALOGS;
 import static io.trino.transaction.InMemoryTransactionManager.createTestTransactionManager;
 import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
@@ -98,7 +99,7 @@ public MetadataManager build()
             }
 
             if (languageFunctionManager == null) {
-                BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager);
+                BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
                 LanguageFunctionEngineManager engineManager = new LanguageFunctionEngineManager();
                 languageFunctionManager = new LanguageFunctionManager(new SqlParser(), typeManager, _ -> ImmutableSet.of(), blockEncodingSerde, engineManager);
             }
diff --git a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java
index 9d46eca7ddfd..63e5eeff5c2d 100644
--- a/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java
+++ b/core/trino-main/src/test/java/io/trino/server/remotetask/TestHttpRemoteTask.java
@@ -61,6 +61,7 @@
 import io.trino.server.FailTaskRequest;
 import io.trino.server.HttpRemoteTaskFactory;
 import io.trino.server.TaskUpdateRequest;
+import io.trino.simd.BlockEncodingSimdSupport;
 import io.trino.spi.ErrorCode;
 import io.trino.spi.QueryId;
 import io.trino.spi.block.Block;
@@ -676,6 +677,7 @@ public void configure(Binder binder)
                         jsonCodecBinder(binder).bindJsonCodec(TaskUpdateRequest.class);
                         jsonCodecBinder(binder).bindJsonCodec(FailTaskRequest.class);
 
+                        binder.bind(BlockEncodingSimdSupport.class).toInstance(new BlockEncodingSimdSupport(true));
                         binder.bind(TypeManager.class).toInstance(TESTING_TYPE_MANAGER);
                         binder.bind(BlockEncodingManager.class).in(SINGLETON);
                         binder.bind(BlockEncodingSerde.class).to(InternalBlockEncodingSerde.class).in(SINGLETON);
diff --git a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java
index 46f5d501f8c7..c1241b0d2b2d 100644
--- a/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java
+++ b/core/trino-main/src/test/java/io/trino/sql/analyzer/TestFeaturesConfig.java
@@ -17,6 +17,7 @@
 import com.google.common.collect.ImmutableMap;
 import io.airlift.units.DataSize;
 import io.trino.FeaturesConfig;
+import io.trino.FeaturesConfig.BlockSerdeVectorizedNullSuppressionStrategy;
 import io.trino.FeaturesConfig.DataIntegrityVerification;
 import org.junit.jupiter.api.Test;
 
@@ -54,6 +55,7 @@ public void testDefaults()
                 .setMemoryRevokingThreshold(0.9)
                 .setMemoryRevokingTarget(0.5)
                 .setExchangeCompressionCodec(NONE)
+                .setBlockSerdeVectorizedNullSuppressionStrategy(BlockSerdeVectorizedNullSuppressionStrategy.AUTO)
                 .setExchangeDataIntegrityVerification(DataIntegrityVerification.ABORT)
                 .setPagesIndexEagerCompactionEnabled(false)
                 .setFilterAndProjectMinOutputPageSize(DataSize.of(500, KILOBYTE))
@@ -89,6 +91,7 @@ public void testExplicitPropertyMappings()
                 .put("memory-revoking-threshold", "0.2")
                 .put("memory-revoking-target", "0.8")
                 .put("exchange.compression-codec", "ZSTD")
+                .put("experimental.blockserde-vectorized-null-suppression-strategy", "NONE")
                 .put("exchange.data-integrity-verification", "RETRY")
                 .put("pages-index.eager-compaction-enabled", "true")
                 .put("filter-and-project-min-output-page-size", "1MB")
@@ -121,6 +124,7 @@ public void testExplicitPropertyMappings()
                 .setMemoryRevokingThreshold(0.2)
                 .setMemoryRevokingTarget(0.8)
                 .setExchangeCompressionCodec(ZSTD)
+                .setBlockSerdeVectorizedNullSuppressionStrategy(BlockSerdeVectorizedNullSuppressionStrategy.NONE)
                 .setExchangeDataIntegrityVerification(DataIntegrityVerification.RETRY)
                 .setPagesIndexEagerCompactionEnabled(true)
                 .setFilterAndProjectMinOutputPageSize(DataSize.of(1, MEGABYTE))
diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestingPlannerContext.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestingPlannerContext.java
index 1d4cc3af65b6..f0c024059bc2 100644
--- a/core/trino-main/src/test/java/io/trino/sql/planner/TestingPlannerContext.java
+++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestingPlannerContext.java
@@ -16,7 +16,6 @@
 import com.google.common.collect.ImmutableSet;
 import io.trino.FeaturesConfig;
 import io.trino.connector.CatalogServiceProvider;
-import io.trino.metadata.BlockEncodingManager;
 import io.trino.metadata.FunctionBundle;
 import io.trino.metadata.FunctionManager;
 import io.trino.metadata.GlobalFunctionCatalog;
@@ -51,6 +50,7 @@
 import static com.google.common.base.Preconditions.checkState;
 import static io.airlift.tracing.Tracing.noopTracer;
 import static io.trino.client.NodeVersion.UNKNOWN;
+import static io.trino.metadata.BlockEncodingManager.TESTING_BLOCK_ENCODING_MANAGER;
 import static java.util.Objects.requireNonNull;
 
 public final class TestingPlannerContext
@@ -125,7 +125,7 @@ public PlannerContext build()
             globalFunctionCatalog.addFunctions(SystemFunctionBundle.create(featuresConfig, typeOperators, new BlockTypeOperators(typeOperators), UNKNOWN));
             functionBundles.forEach(globalFunctionCatalog::addFunctions);
 
-            BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager);
+            BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(TESTING_BLOCK_ENCODING_MANAGER, typeManager);
 
             LanguageFunctionManager languageFunctionManager = new LanguageFunctionManager(
                     new SqlParser(),
diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml
index 84a907ca302e..45fc9589e3dd 100644
--- a/core/trino-spi/pom.xml
+++ b/core/trino-spi/pom.xml
@@ -328,10 +328,39 @@
                                     method double[] io.trino.spi.metrics.Distribution<T>::getPercentiles(double[])
                                 
                                 - 
+                                    true
                                     java.method.numberOfParametersChangedmethod void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<io.trino.spi.eventlistener.StageOutputBufferUtilization>, java.util.List<io.trino.spi.eventlistener.StageOutputBufferMetrics>, java.util.List<io.trino.spi.eventlistener.StageTaskStatistics>, java.util.List<io.trino.spi.eventlistener.DynamicFilterDomainStatistics>, java.util.function.Supplier<java.util.List<java.lang.String>>, java.util.List<io.trino.spi.eventlistener.QueryPlanOptimizerStatistics>, java.util.Optional<java.lang.String>)
                                     method void io.trino.spi.eventlistener.QueryStatistics::<init>(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, java.util.Optional<java.time.Duration>, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List<io.trino.spi.eventlistener.StageGcStatistics>, int, boolean, java.util.List<io.trino.spi.eventlistener.StageCpuDistribution>, java.util.List<io.trino.spi.eventlistener.StageOutputBufferUtilization>, java.util.List<io.trino.spi.eventlistener.StageOutputBufferMetrics>, java.util.List<io.trino.spi.eventlistener.StageTaskStatistics>, java.util.List<io.trino.spi.eventlistener.DynamicFilterDomainStatistics>, java.util.function.Supplier<java.util.List<java.lang.String>>, java.util.List<io.trino.spi.eventlistener.QueryPlanOptimizerStatistics>, java.util.Map<java.lang.String, io.trino.spi.metrics.Metrics>, java.util.Optional<java.lang.String>)
+- 
+                                    true
+                                    java.method.numberOfParametersChanged+                                    method void io.trino.spi.block.ByteArrayBlockEncoding::<init>()
+                                    method void io.trino.spi.block.ByteArrayBlockEncoding::<init>(boolean)
+                                    ByteArrayBlockEncoding need to accept a parameter to enable SIMD support
+
+- 
+                                    true
+                                    java.method.numberOfParametersChanged+                                    method void io.trino.spi.block.IntArrayBlockEncoding::<init>()
+                                    method void io.trino.spi.block.IntArrayBlockEncoding::<init>(boolean)
+                                    IntArrayBlockEncoding need to accept a parameter to enable SIMD support
+
+- 
+                                    true
+                                    java.method.numberOfParametersChanged+                                    method void io.trino.spi.block.LongArrayBlockEncoding::<init>()
+                                    method void io.trino.spi.block.LongArrayBlockEncoding::<init>(boolean)
+                                    LongArrayBlockEncoding need to accept a parameter to enable SIMD support
+
+- 
+                                    true
+                                    java.method.numberOfParametersChanged+                                    method void io.trino.spi.block.ShortArrayBlockEncoding::<init>()
+                                    method void io.trino.spi.block.ShortArrayBlockEncoding::<init>(boolean)
+                                    ShortArrayBlockEncoding need to accept a parameter to enable SIMD support
+
diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java
index cc03f3f18dfc..7e0dc3ff4ac2 100644
--- a/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java
+++ b/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java
@@ -28,6 +28,12 @@ public class ByteArrayBlockEncoding
         implements BlockEncoding
 {
     public static final String NAME = "BYTE_ARRAY";
+    private final boolean enableVectorizedNullSuppression;
+
+    public ByteArrayBlockEncoding(boolean enableVectorizedNullSuppression)
+    {
+        this.enableVectorizedNullSuppression = enableVectorizedNullSuppression;
+    }
 
     @Override
     public String getName()
@@ -60,15 +66,12 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
             sliceOutput.writeBytes(rawValues, rawOffset, positionCount);
         }
         else {
-            byte[] valuesWithoutNull = new byte[positionCount];
-            int nonNullPositionCount = 0;
-            for (int i = 0; i < positionCount; i++) {
-                valuesWithoutNull[nonNullPositionCount] = rawValues[i + rawOffset];
-                nonNullPositionCount += isNull[i + rawOffset] ? 0 : 1;
+            if (enableVectorizedNullSuppression) {
+                EncoderUtil.compressBytesWithNullsVectorized(sliceOutput, rawValues, isNull, rawOffset, positionCount);
+            }
+            else {
+                EncoderUtil.compressBytesWithNullsScalar(sliceOutput, rawValues, isNull, rawOffset, positionCount);
             }
-
-            sliceOutput.writeInt(nonNullPositionCount);
-            sliceOutput.writeBytes(valuesWithoutNull, 0, nonNullPositionCount);
         }
     }
 
diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/EncoderUtil.java b/core/trino-spi/src/main/java/io/trino/spi/block/EncoderUtil.java
index 2d6c181cdfe0..8de4eb8686d3 100644
--- a/core/trino-spi/src/main/java/io/trino/spi/block/EncoderUtil.java
+++ b/core/trino-spi/src/main/java/io/trino/spi/block/EncoderUtil.java
@@ -16,13 +16,27 @@
 import io.airlift.slice.SliceInput;
 import io.airlift.slice.SliceOutput;
 import jakarta.annotation.Nullable;
+import jdk.incubator.vector.ByteVector;
+import jdk.incubator.vector.IntVector;
+import jdk.incubator.vector.LongVector;
+import jdk.incubator.vector.ShortVector;
+import jdk.incubator.vector.VectorMask;
+import jdk.incubator.vector.VectorSpecies;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Optional;
 
+import static java.util.Objects.checkFromIndexSize;
+import static java.util.Objects.requireNonNull;
+
 final class EncoderUtil
 {
+    private static final VectorSpecies LONG_SPECIES = LongVector.SPECIES_PREFERRED;
+    private static final VectorSpecies INT_SPECIES = IntVector.SPECIES_PREFERRED;
+    private static final VectorSpecies BYTE_SPECIES = ByteVector.SPECIES_PREFERRED;
+    private static final VectorSpecies SHORT_SPECIES = ShortVector.SPECIES_PREFERRED;
+
     private EncoderUtil() {}
 
     /**
@@ -120,4 +134,156 @@ public static byte[] retrieveNullBits(SliceInput sliceInput, int positionCount)
             throw new UncheckedIOException(e);
         }
     }
+
+    static void compressBytesWithNullsVectorized(SliceOutput sliceOutput, byte[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        byte[] compressed = new byte[length];
+        int valuesIndex = 0;
+        int compressedIndex = 0;
+        for (; valuesIndex < BYTE_SPECIES.loopBound(length); valuesIndex += BYTE_SPECIES.length()) {
+            VectorMask mask = BYTE_SPECIES.loadMask(isNull, valuesIndex + offset).not();
+            ByteVector.fromArray(BYTE_SPECIES, values, valuesIndex + offset)
+                    .compress(mask)
+                    .intoArray(compressed, compressedIndex);
+            compressedIndex += mask.trueCount();
+        }
+        for (; valuesIndex < length; valuesIndex++) {
+            compressed[compressedIndex] = values[valuesIndex + offset];
+            compressedIndex += isNull[valuesIndex + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeBytes(compressed, 0, compressedIndex);
+    }
+
+    static void compressBytesWithNullsScalar(SliceOutput sliceOutput, byte[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        byte[] compressed = new byte[length];
+        int compressedIndex = 0;
+        for (int i = 0; i < length; i++) {
+            compressed[compressedIndex] = values[i + offset];
+            compressedIndex += isNull[i + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeBytes(compressed, 0, compressedIndex);
+    }
+
+    static void compressShortsWithNullsVectorized(SliceOutput sliceOutput, short[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        short[] compressed = new short[length];
+        int valuesIndex = 0;
+        int compressedIndex = 0;
+        for (; valuesIndex < SHORT_SPECIES.loopBound(length); valuesIndex += SHORT_SPECIES.length()) {
+            VectorMask mask = SHORT_SPECIES.loadMask(isNull, valuesIndex + offset).not();
+            ShortVector.fromArray(SHORT_SPECIES, values, valuesIndex + offset)
+                    .compress(mask)
+                    .intoArray(compressed, compressedIndex);
+            compressedIndex += mask.trueCount();
+        }
+        for (; valuesIndex < length; valuesIndex++) {
+            compressed[compressedIndex] = values[valuesIndex + offset];
+            compressedIndex += isNull[valuesIndex + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeShorts(compressed, 0, compressedIndex);
+    }
+
+    static void compressShortsWithNullsScalar(SliceOutput sliceOutput, short[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        short[] compressed = new short[length];
+        int compressedIndex = 0;
+        for (int i = 0; i < length; i++) {
+            compressed[compressedIndex] = values[i + offset];
+            compressedIndex += isNull[i + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeShorts(compressed, 0, compressedIndex);
+    }
+
+    static void compressIntsWithNullsVectorized(SliceOutput sliceOutput, int[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        int[] compressed = new int[length];
+        int valuesIndex = 0;
+        int compressedIndex = 0;
+        for (; valuesIndex < INT_SPECIES.loopBound(length); valuesIndex += INT_SPECIES.length()) {
+            VectorMask mask = INT_SPECIES.loadMask(isNull, valuesIndex + offset).not();
+            IntVector.fromArray(INT_SPECIES, values, valuesIndex + offset)
+                    .compress(mask)
+                    .intoArray(compressed, compressedIndex);
+            compressedIndex += mask.trueCount();
+        }
+        for (; valuesIndex < length; valuesIndex++) {
+            compressed[compressedIndex] = values[valuesIndex + offset];
+            compressedIndex += isNull[valuesIndex + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeInts(compressed, 0, compressedIndex);
+    }
+
+    static void compressIntsWithNullsScalar(SliceOutput sliceOutput, int[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        int[] compressed = new int[length];
+        int compressedIndex = 0;
+        for (int i = 0; i < length; i++) {
+            compressed[compressedIndex] = values[i + offset];
+            compressedIndex += isNull[i + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeInts(compressed, 0, compressedIndex);
+    }
+
+    static void compressLongsWithNullsVectorized(SliceOutput sliceOutput, long[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        long[] compressed = new long[length];
+        int valuesIndex = 0;
+        int compressedIndex = 0;
+        for (; valuesIndex < LONG_SPECIES.loopBound(length); valuesIndex += LONG_SPECIES.length()) {
+            VectorMask mask = LONG_SPECIES.loadMask(isNull, valuesIndex + offset).not();
+            LongVector.fromArray(LONG_SPECIES, values, valuesIndex + offset)
+                    .compress(mask)
+                    .intoArray(compressed, compressedIndex);
+            compressedIndex += mask.trueCount();
+        }
+        for (; valuesIndex < length; valuesIndex++) {
+            compressed[compressedIndex] = values[valuesIndex + offset];
+            compressedIndex += isNull[valuesIndex + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeLongs(compressed, 0, compressedIndex);
+    }
+
+    static void compressLongsWithNullsScalar(SliceOutput sliceOutput, long[] values, boolean[] isNull, int offset, int length)
+    {
+        requireNonNull(sliceOutput, "sliceOutput is null");
+        checkFromIndexSize(offset, length, values.length);
+        checkFromIndexSize(offset, length, isNull.length);
+        long[] compressed = new long[length];
+        int compressedIndex = 0;
+        for (int i = 0; i < length; i++) {
+            compressed[compressedIndex] = values[i + offset];
+            compressedIndex += isNull[i + offset] ? 0 : 1;
+        }
+        sliceOutput.writeInt(compressedIndex);
+        sliceOutput.writeLongs(compressed, 0, compressedIndex);
+    }
 }
diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java
index 0f76c1d49536..09a752492cd8 100644
--- a/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java
+++ b/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java
@@ -27,6 +27,12 @@ public class IntArrayBlockEncoding
         implements BlockEncoding
 {
     public static final String NAME = "INT_ARRAY";
+    private final boolean enableVectorizedNullSuppression;
+
+    public IntArrayBlockEncoding(boolean enableVectorizedNullSuppression)
+    {
+        this.enableVectorizedNullSuppression = enableVectorizedNullSuppression;
+    }
 
     @Override
     public String getName()
@@ -59,15 +65,12 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
             sliceOutput.writeInts(rawValues, rawOffset, positionCount);
         }
         else {
-            int[] valuesWithoutNull = new int[positionCount];
-            int nonNullPositionCount = 0;
-            for (int i = 0; i < positionCount; i++) {
-                valuesWithoutNull[nonNullPositionCount] = rawValues[i + rawOffset];
-                nonNullPositionCount += isNull[i + rawOffset] ? 0 : 1;
+            if (enableVectorizedNullSuppression) {
+                EncoderUtil.compressIntsWithNullsVectorized(sliceOutput, rawValues, isNull, rawOffset, positionCount);
+            }
+            else {
+                EncoderUtil.compressIntsWithNullsScalar(sliceOutput, rawValues, isNull, rawOffset, positionCount);
             }
-
-            sliceOutput.writeInt(nonNullPositionCount);
-            sliceOutput.writeInts(valuesWithoutNull, 0, nonNullPositionCount);
         }
     }
 
diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java
index 38759e13ef21..ea890bcd5f47 100644
--- a/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java
+++ b/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java
@@ -27,6 +27,12 @@ public class LongArrayBlockEncoding
         implements BlockEncoding
 {
     public static final String NAME = "LONG_ARRAY";
+    private final boolean enableVectorizedNullSuppression;
+
+    public LongArrayBlockEncoding(boolean enableVectorizedNullSuppression)
+    {
+        this.enableVectorizedNullSuppression = enableVectorizedNullSuppression;
+    }
 
     @Override
     public String getName()
@@ -59,15 +65,12 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
             sliceOutput.writeLongs(rawValues, rawOffset, positionCount);
         }
         else {
-            long[] valuesWithoutNull = new long[positionCount];
-            int nonNullPositionCount = 0;
-            for (int i = 0; i < positionCount; i++) {
-                valuesWithoutNull[nonNullPositionCount] = rawValues[i + rawOffset];
-                nonNullPositionCount += isNull[i + rawOffset] ? 0 : 1;
+            if (enableVectorizedNullSuppression) {
+                EncoderUtil.compressLongsWithNullsVectorized(sliceOutput, rawValues, isNull, rawOffset, positionCount);
+            }
+            else {
+                EncoderUtil.compressLongsWithNullsScalar(sliceOutput, rawValues, isNull, rawOffset, positionCount);
             }
-
-            sliceOutput.writeInt(nonNullPositionCount);
-            sliceOutput.writeLongs(valuesWithoutNull, 0, nonNullPositionCount);
         }
     }
 
diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java
index 512164c605d5..eee56037e0ac 100644
--- a/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java
+++ b/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java
@@ -27,6 +27,12 @@ public class ShortArrayBlockEncoding
         implements BlockEncoding
 {
     public static final String NAME = "SHORT_ARRAY";
+    private final boolean enableVectorizedNullSuppression;
+
+    public ShortArrayBlockEncoding(boolean enableVectorizedNullSuppression)
+    {
+        this.enableVectorizedNullSuppression = enableVectorizedNullSuppression;
+    }
 
     @Override
     public String getName()
@@ -59,15 +65,12 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
             sliceOutput.writeShorts(rawValues, rawOffset, positionCount);
         }
         else {
-            short[] valuesWithoutNull = new short[positionCount];
-            int nonNullPositionCount = 0;
-            for (int i = 0; i < positionCount; i++) {
-                valuesWithoutNull[nonNullPositionCount] = rawValues[i + rawOffset];
-                nonNullPositionCount += isNull[i + rawOffset] ? 0 : 1;
+            if (enableVectorizedNullSuppression) {
+                EncoderUtil.compressShortsWithNullsVectorized(sliceOutput, rawValues, isNull, rawOffset, positionCount);
+            }
+            else {
+                EncoderUtil.compressShortsWithNullsScalar(sliceOutput, rawValues, isNull, rawOffset, positionCount);
             }
-
-            sliceOutput.writeInt(nonNullPositionCount);
-            sliceOutput.writeShorts(valuesWithoutNull, 0, nonNullPositionCount);
         }
     }
 
diff --git a/core/trino-spi/src/main/java/module-info.java b/core/trino-spi/src/main/java/module-info.java
index c1ca692fb861..10f9218e0934 100644
--- a/core/trino-spi/src/main/java/module-info.java
+++ b/core/trino-spi/src/main/java/module-info.java
@@ -17,6 +17,7 @@
     requires transitive io.opentelemetry.api;
     requires jakarta.annotation;
     requires transitive slice;
+    requires jdk.incubator.vector;
 
     exports io.trino.spi;
     exports io.trino.spi.block;
diff --git a/core/trino-spi/src/test/java/io/trino/spi/block/TestEncoderUtil.java b/core/trino-spi/src/test/java/io/trino/spi/block/TestEncoderUtil.java
new file mode 100644
index 000000000000..abf241d208f5
--- /dev/null
+++ b/core/trino-spi/src/test/java/io/trino/spi/block/TestEncoderUtil.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.spi.block;
+
+import io.airlift.slice.DynamicSliceOutput;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Execution(ExecutionMode.SAME_THREAD)
+public class TestEncoderUtil
+{
+    private static final int[] TEST_LENGTHS = {0, 3, 255, 257, 512, 530, 1024, 2048, 8192};
+    private static final int[] TEST_OFFSETS = {0, 2, 256};
+    private static final long RANDOM_SEED = 42;
+
+    @Test
+    public void testBytesScalarEqualsVector()
+    {
+        for (int length : TEST_LENGTHS) {
+            for (int offset : TEST_OFFSETS) {
+                byte[] values = randomBytes(offset + length);
+                for (boolean[] isNull : getIsNullArray(offset + length)) {
+                    byte[] scalar = compressBytesScalar(values, isNull, offset, length);
+                    byte[] vector = compressBytesVectorized(values, isNull, offset, length);
+                    assertThat(vector).as("bytes: scalar and vector outputs differ").isEqualTo(scalar);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testShortsScalarEqualsVector()
+    {
+        for (int length : TEST_LENGTHS) {
+            for (int offset : TEST_OFFSETS) {
+                short[] values = randomShorts(offset + length);
+                for (boolean[] isNull : getIsNullArray(offset + length)) {
+                    byte[] scalar = compressShortsScalar(values, isNull, offset, length);
+                    byte[] vector = compressShortsVectorized(values, isNull, offset, length);
+                    assertThat(vector).as("shorts: scalar and vector outputs differ").isEqualTo(scalar);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testIntsScalarEqualsVector()
+    {
+        for (int length : TEST_LENGTHS) {
+            for (int offset : TEST_OFFSETS) {
+                int[] values = randomInts(offset + length);
+                for (boolean[] isNull : getIsNullArray(offset + length)) {
+                    byte[] scalar = compressIntsScalar(values, isNull, offset, length);
+                    byte[] vector = compressIntsVectorized(values, isNull, offset, length);
+                    assertThat(vector).as("ints: scalar and vector outputs differ").isEqualTo(scalar);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testLongsScalarEqualsVector()
+    {
+        for (int length : TEST_LENGTHS) {
+            for (int offset : TEST_OFFSETS) {
+                long[] values = randomLongs(offset + length);
+                for (boolean[] isNull : getIsNullArray(offset + length)) {
+                    byte[] scalar = compressLongsScalar(values, isNull, offset, length);
+                    byte[] vector = compressLongsVectorized(values, isNull, offset, length);
+                    assertThat(vector).as("longs: scalar and vector outputs differ").isEqualTo(scalar);
+                }
+            }
+        }
+    }
+
+    public static boolean[][] getIsNullArray(int length)
+    {
+        return new boolean[][] {
+                all(false, length),
+                all(true, length),
+                alternating(length),
+                randomBooleans(length)};
+    }
+
+    static byte[] compressBytesScalar(byte[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Byte.BYTES + 4));
+        EncoderUtil.compressBytesWithNullsScalar(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] compressBytesVectorized(byte[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Byte.BYTES + 4));
+        EncoderUtil.compressBytesWithNullsVectorized(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] compressShortsScalar(short[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Short.BYTES + 4));
+        EncoderUtil.compressShortsWithNullsScalar(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] compressShortsVectorized(short[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Short.BYTES + 4));
+        EncoderUtil.compressShortsWithNullsVectorized(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] compressIntsScalar(int[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Integer.BYTES + 4));
+        EncoderUtil.compressIntsWithNullsScalar(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] compressIntsVectorized(int[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Integer.BYTES + 4));
+        EncoderUtil.compressIntsWithNullsVectorized(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    static byte[] compressLongsScalar(long[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Long.BYTES + 4));
+        EncoderUtil.compressLongsWithNullsScalar(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] compressLongsVectorized(long[] values, boolean[] isNull, int offset, int length)
+    {
+        DynamicSliceOutput out = new DynamicSliceOutput(length * (Long.BYTES + 4));
+        EncoderUtil.compressLongsWithNullsVectorized(out, values, isNull, offset, length);
+        return out.slice().getBytes();
+    }
+
+    private static byte[] randomBytes(int size)
+    {
+        byte[] data = new byte[size];
+        Random r = new Random(RANDOM_SEED);
+        r.nextBytes(data);
+        return data;
+    }
+
+    private static short[] randomShorts(int size)
+    {
+        short[] data = new short[size];
+        Random r = new Random(RANDOM_SEED);
+        for (int i = 0; i < size; i++) {
+            data[i] = (short) r.nextInt();
+        }
+        return data;
+    }
+
+    private static int[] randomInts(int size)
+    {
+        int[] data = new int[size];
+        Random r = new Random(RANDOM_SEED);
+        for (int i = 0; i < size; i++) {
+            data[i] = r.nextInt();
+        }
+        return data;
+    }
+
+    private static long[] randomLongs(int size)
+    {
+        long[] data = new long[size];
+        Random r = new Random(RANDOM_SEED);
+        for (int i = 0; i < size; i++) {
+            data[i] = r.nextLong();
+        }
+        return data;
+    }
+
+    private static boolean[] all(boolean value, int size)
+    {
+        boolean[] out = new boolean[size];
+        if (value) {
+            for (int i = 0; i < size; i++) {
+                out[i] = true;
+            }
+        }
+        return out;
+    }
+
+    private static boolean[] alternating(int size)
+    {
+        boolean[] out = new boolean[size];
+        for (int i = 0; i < size; i++) {
+            out[i] = (i % 2) == 0;
+        }
+        return out;
+    }
+
+    private static boolean[] randomBooleans(int size)
+    {
+        boolean[] out = new boolean[size];
+        Random r = new Random(RANDOM_SEED);
+        for (int i = 0; i < size; i++) {
+            out[i] = r.nextDouble() < 0.3;
+        }
+        return out;
+    }
+}
diff --git a/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java b/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java
index 248eebf6323c..24b2731f51f9 100644
--- a/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java
+++ b/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java
@@ -47,10 +47,10 @@ public TestingBlockEncodingSerde(Function types)
         this.types = requireNonNull(types, "types is null");
         // add the built-in BlockEncodings
         addBlockEncoding(new VariableWidthBlockEncoding());
-        addBlockEncoding(new ByteArrayBlockEncoding());
-        addBlockEncoding(new ShortArrayBlockEncoding());
-        addBlockEncoding(new IntArrayBlockEncoding());
-        addBlockEncoding(new LongArrayBlockEncoding());
+        addBlockEncoding(new ByteArrayBlockEncoding(true));
+        addBlockEncoding(new ShortArrayBlockEncoding(true));
+        addBlockEncoding(new IntArrayBlockEncoding(true));
+        addBlockEncoding(new LongArrayBlockEncoding(true));
         addBlockEncoding(new Fixed12BlockEncoding());
         addBlockEncoding(new Int128ArrayBlockEncoding());
         addBlockEncoding(new DictionaryBlockEncoding());