From 8eb2465b3b5e4c61e7e225d1ec021636745a1cef Mon Sep 17 00:00:00 2001 From: Anamika AggarwaL Date: Wed, 15 Apr 2026 13:11:29 +0530 Subject: [PATCH 1/4] feat: Add BROTLI page decompression support for Parquet reads --- CHANGELOG.md | 1 + dataframe.cabal | 4 + src/DataFrame/IO/Parquet.hs | 5 + src/DataFrame/IO/Parquet/Brotli.hs | 146 +++++++++++++++++++++++++++++ src/DataFrame/IO/Parquet/Page.hs | 2 + tests/Parquet.hs | 44 +++++++-- tests/ParquetTestHelpers.hs | 71 ++++++++++++++ tests/data/README.md | 5 + 8 files changed, 272 insertions(+), 6 deletions(-) create mode 100644 src/DataFrame/IO/Parquet/Brotli.hs create mode 100644 tests/ParquetTestHelpers.hs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7aebf7e1..f28c7061 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Add `DataFrame.Typed.Lazy` module — a type-safe lazy query pipeline combining compile-time schema tracking with deferred execution. * Add `fromCsv` function for parsing a CSV string directly into a DataFrame. * Add `DataKinds` extension and `DataFrame.Typed` import to the GHCi file for easier interactive typed dataframe workflows. +* Add BROTLI page decompression for Parquet reads and document the currently supported Parquet compression codecs. ### Performance * Specialize and inline aggregation functions (`sum`, `mean`, `variance`, `median`, `stddev`, etc.) to avoid expensive numeric conversions at runtime. diff --git a/dataframe.cabal b/dataframe.cabal index 9eb26c5c..f6d88081 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -111,6 +111,7 @@ library DataFrame.Typed.Expr, DataFrame.Typed.Lazy, DataFrame.Typed + other-modules: DataFrame.IO.Parquet.Brotli build-depends: base >= 4 && <5, deepseq >= 1 && < 2, aeson >= 0.11.0.0 && < 3, @@ -143,6 +144,8 @@ library http-conduit >= 2.3 && < 3, streamly-core >= 0.2.3 && < 0.4, streamly-bytestring >= 0.2.0 && < 0.4 + if !os(windows) + build-depends: unix >= 2 && < 3 hs-source-dirs: src default-language: Haskell2010 @@ -267,6 +270,7 @@ test-suite tests Operations.Typing, LazyParquet, Parquet, + ParquetTestHelpers, ParquetTestData, Properties, Monad diff --git a/src/DataFrame/IO/Parquet.hs b/src/DataFrame/IO/Parquet.hs index a8c85567..8bed3bc3 100644 --- a/src/DataFrame/IO/Parquet.hs +++ b/src/DataFrame/IO/Parquet.hs @@ -112,6 +112,11 @@ defaultParquetReadOptions = @ ghci> D.readParquet ".\/data\/mtcars.parquet" @ + +Current page decompression supports Parquet files using +@UNCOMPRESSED@, @SNAPPY@, @GZIP@, @ZSTD@, and @BROTLI@ codecs. +@LZ4@ and @LZ4_RAW@ pages still fail with an unsupported compression error. +BROTLI pages require the @libbrotlidec@ shared library to be available at runtime. -} readParquet :: FilePath -> IO DataFrame readParquet = readParquetWithOpts defaultParquetReadOptions diff --git a/src/DataFrame/IO/Parquet/Brotli.hs b/src/DataFrame/IO/Parquet/Brotli.hs new file mode 100644 index 00000000..ec7156ee --- /dev/null +++ b/src/DataFrame/IO/Parquet/Brotli.hs @@ -0,0 +1,146 @@ +{-# LANGUAGE CPP #-} + +module DataFrame.IO.Parquet.Brotli (decompress) where + +import qualified Data.ByteString as BS + +#ifdef mingw32_HOST_OS +decompress :: Int -> BS.ByteString -> IO BS.ByteString +decompress _ _ = + error + "BROTLI decompression requires libbrotlidec and is not supported on Windows in this build" +#else +import Control.Exception (SomeException, try) +import qualified Data.ByteString.Internal as BSI +import qualified Data.ByteString.Unsafe as BSU +import Data.List (intercalate) +import Data.Word (Word8) +import Foreign.C.Types (CInt (..), CSize (..)) +import Foreign.ForeignPtr (withForeignPtr) +import Foreign.Marshal.Alloc (alloca, allocaBytes) +import Foreign.Ptr (FunPtr, Ptr, castPtr) +import Foreign.Storable (peek, poke) +import System.IO.Unsafe (unsafePerformIO) +import System.Posix.DynamicLinker (DL, RTLDFlags (RTLD_NOW), dlclose, dlopen, dlsym) + +data BrotliDecoder = BrotliDecoder + { _decoderHandle :: DL + , decoderDecompress :: BrotliDecoderDecompressFn + } + +type BrotliDecoderDecompressFn = + CSize -> Ptr Word8 -> Ptr CSize -> Ptr Word8 -> IO CInt + +foreign import ccall unsafe "dynamic" + mkBrotliDecoderDecompressFn :: + FunPtr BrotliDecoderDecompressFn -> BrotliDecoderDecompressFn + +brotliDecoder :: Either String BrotliDecoder +brotliDecoder = unsafePerformIO loadBrotliDecoder +{-# NOINLINE brotliDecoder #-} + +brotliLibraryCandidates :: [FilePath] +brotliLibraryCandidates = + [ "libbrotlidec.so.1" + , "libbrotlidec.so" + , "libbrotlidec.dylib" + , "/opt/homebrew/opt/brotli/lib/libbrotlidec.1.dylib" + , "/opt/homebrew/lib/libbrotlidec.dylib" + , "/usr/local/lib/libbrotlidec.dylib" + ] + +loadBrotliDecoder :: IO (Either String BrotliDecoder) +loadBrotliDecoder = go brotliLibraryCandidates [] + where + go [] errorsSeen = + pure $ + Left $ + unlines + [ "Unable to load libbrotlidec for Parquet BROTLI decoding." + , "Tried: " ++ intercalate ", " brotliLibraryCandidates + , "Errors:" + , unlines (map (" " ++) (reverse errorsSeen)) + ] + go (candidate : rest) errorsSeen = do + opened <- try (dlopen candidate [RTLD_NOW]) :: IO (Either SomeException DL) + case opened of + Left err -> + go rest (formatError candidate err : errorsSeen) + Right handle -> do + symbolResult <- + try (dlsym handle "BrotliDecoderDecompress") :: + IO (Either SomeException (FunPtr BrotliDecoderDecompressFn)) + case symbolResult of + Left err -> do + dlclose handle + go rest (formatError candidate err : errorsSeen) + Right fnPtr -> + pure $ + Right $ + BrotliDecoder + { _decoderHandle = handle + , decoderDecompress = mkBrotliDecoderDecompressFn fnPtr + } + + formatError candidate err = candidate ++ ": " ++ show err + +brotliDecoderSuccess :: CInt +brotliDecoderSuccess = 1 + +decompress :: Int -> BS.ByteString -> IO BS.ByteString +decompress expectedSize compressed + | expectedSize < 0 = + error ("BROTLI decompression requires a non-negative size, got " ++ show expectedSize) + | otherwise = + case brotliDecoder of + Left err -> error err + Right decoder -> + BSU.unsafeUseAsCStringLen compressed $ \(inputPtr, inputLen) -> + withOutputBuffer expectedSize $ \outputPtr -> do + actualSize <- + runDecoder + decoder + (fromIntegral inputLen) + (castPtr inputPtr) + outputPtr + expectedSize + validateDecodedSize expectedSize actualSize + +withOutputBuffer :: Int -> (Ptr Word8 -> IO ()) -> IO BS.ByteString +withOutputBuffer expectedSize useOutputPtr + | expectedSize == 0 = + allocaBytes 1 $ \outputPtr -> do + useOutputPtr outputPtr + pure BS.empty + | otherwise = do + fp <- BSI.mallocByteString expectedSize + withForeignPtr fp useOutputPtr + pure (BSI.fromForeignPtr fp 0 expectedSize) + +runDecoder :: BrotliDecoder -> CSize -> Ptr Word8 -> Ptr Word8 -> Int -> IO Int +runDecoder decoder inputLen inputPtr outputPtr expectedSize = + alloca $ \outputSizePtr -> do + poke outputSizePtr (fromIntegral expectedSize) + result <- + decoderDecompress decoder inputLen inputPtr outputSizePtr outputPtr + validateDecoderResult result + fromIntegral <$> peek outputSizePtr + +validateDecoderResult :: CInt -> IO () +validateDecoderResult result + | result == brotliDecoderSuccess = pure () + | otherwise = + error + ("BROTLI decompression failed with result code " ++ show result) + +validateDecodedSize :: Int -> Int -> IO () +validateDecodedSize expectedSize actualSize + | actualSize == expectedSize = pure () + | otherwise = + error + ( "BROTLI decompressed size mismatch: expected " + ++ show expectedSize + ++ " bytes, got " + ++ show actualSize + ) +#endif diff --git a/src/DataFrame/IO/Parquet/Page.hs b/src/DataFrame/IO/Parquet/Page.hs index 6c17c766..27d8c00b 100644 --- a/src/DataFrame/IO/Parquet/Page.hs +++ b/src/DataFrame/IO/Parquet/Page.hs @@ -2,6 +2,7 @@ module DataFrame.IO.Parquet.Page where +import qualified DataFrame.IO.Parquet.Brotli as Brotli import qualified Codec.Compression.GZip as GZip import qualified Codec.Compression.Zstd.Streaming as Zstd import Data.Bits @@ -61,6 +62,7 @@ readPage c columnBytes = Right res -> pure res UNCOMPRESSED -> pure compressed GZIP -> pure (LB.toStrict (GZip.decompress (BS.fromStrict compressed))) + BROTLI -> Brotli.decompress (fromIntegral (uncompressedPageSize hdr)) compressed other -> error ("Unsupported compression type: " ++ show other) pure ( Just $ Page hdr fullData diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 6c35c284..ee9266e4 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -9,12 +9,14 @@ import qualified DataFrame as D import qualified DataFrame.Functions as F import qualified DataFrame.IO.Parquet as DP import ParquetTestData (allTypes, mtCarsDataset, tinyPagesLast10, transactions) +import ParquetTestHelpers (assertFirstColumnCodec, buildDataPageV1, encodePlainInt32Payload) import qualified Data.ByteString as BS import Data.Int import qualified Data.Set as S import qualified Data.Text as T -import Data.Word +import qualified Data.Vector.Unboxed as VU +import DataFrame.IO.Parquet.Page (readNInt32Vec, readPage) import DataFrame.IO.Parquet.Thrift ( columnMetaData, columnPathInSchema, @@ -23,7 +25,14 @@ import DataFrame.IO.Parquet.Thrift ( rowGroups, schema, ) -import DataFrame.IO.Parquet.Types (columnNullCount) +import DataFrame.IO.Parquet.Types ( + CompressionCodec (BROTLI), + Page (pageBytes, pageHeader), + PageHeader (pageHeaderPageType, pageTypeHeader), + PageType (DATA_PAGE), + PageTypeHeader (DataPageHeader, dataPageHeaderNumValues), + columnNullCount, + ) import DataFrame.Internal.Binary ( littleEndianWord32, littleEndianWord64, @@ -427,12 +436,34 @@ concatenatedGzipMembers = largeBrotliMap :: Test largeBrotliMap = TestCase - ( assertExpectException - "largeBrotliMap" - "BROTLI" - (D.readParquet "./tests/data/large_string_map.brotli.parquet") + ( assertFirstColumnCodec + "largeBrotliMap codec" + BROTLI + "./tests/data/large_string_map.brotli.parquet" ) +brotliPageReader :: Test +brotliPageReader = + TestCase $ do + let expectedPayload = encodePlainInt32Payload [1, 2, 3] + let compressedPayload = BS.pack [31, 11, 0, 248, 167, 1, 2, 4, 6, 86, 10, 162, 4, 0, 194, 30] + let encodedPage = buildDataPageV1 3 expectedPayload compressedPayload + (maybePage, remainder) <- readPage BROTLI encodedPage + assertEqual "brotliPageReader remainder" BS.empty remainder + case maybePage of + Nothing -> assertFailure "brotliPageReader: expected a decoded page" + Just page -> do + assertEqual "brotliPageReader page type" DATA_PAGE (pageHeaderPageType (pageHeader page)) + case pageTypeHeader (pageHeader page) of + DataPageHeader{dataPageHeaderNumValues = numValues} -> + assertEqual "brotliPageReader num values" 3 numValues + other -> + assertFailure ("brotliPageReader: expected DataPageHeader, got " ++ show other) + assertEqual + "brotliPageReader payload" + [1 :: Int32, 2, 3] + (VU.toList (readNInt32Vec 3 (pageBytes page))) + -- --------------------------------------------------------------------------- -- Group 3: Delta / RLE encodings (unsupported → error tests) -- --------------------------------------------------------------------------- @@ -1118,6 +1149,7 @@ tests = , lz4RawCompressedLarger , concatenatedGzipMembers , largeBrotliMap + , brotliPageReader , -- Group 3: delta / rle encodings deltaBinaryPacked , deltaByteArray diff --git a/tests/ParquetTestHelpers.hs b/tests/ParquetTestHelpers.hs new file mode 100644 index 00000000..dab598da --- /dev/null +++ b/tests/ParquetTestHelpers.hs @@ -0,0 +1,71 @@ +module ParquetTestHelpers ( + assertFirstColumnCodec, + buildDataPageV1, + encodePlainInt32Payload, +) where + +import Data.Bits ((.&.), (.|.), shiftL, shiftR, xor) +import qualified Data.ByteString as BS +import Data.Int +import Data.Word +import qualified DataFrame.IO.Parquet as DP +import DataFrame.IO.Parquet.Thrift ( + columnCodec, + columnMetaData, + compactI32, + compactStruct, + rowGroupColumns, + rowGroups, + ) +import DataFrame.IO.Parquet.Types (CompressionCodec) +import DataFrame.Internal.Binary (word32ToLittleEndian) +import Test.HUnit (Assertion, assertEqual, assertFailure) + +assertFirstColumnCodec :: String -> CompressionCodec -> FilePath -> Assertion +assertFirstColumnCodec label expected path = do + (metadata, _) <- DP.readMetadataFromPath path + case rowGroups metadata of + [] -> + assertFailure (label ++ ": parquet file has no row groups") + rowGroup : _ -> case rowGroupColumns rowGroup of + [] -> + assertFailure (label ++ ": first row group has no columns") + columnChunk : _ -> + assertEqual label expected (columnCodec (columnMetaData columnChunk)) + +buildDataPageV1 :: Int32 -> BS.ByteString -> BS.ByteString -> BS.ByteString +buildDataPageV1 numValues payload compressedPayload = + BS.pack + ( field 1 compactI32 (zigZag32 0) + ++ field 1 compactI32 (zigZag32 (fromIntegral (BS.length payload))) + ++ field 1 compactI32 (zigZag32 (fromIntegral (BS.length compressedPayload))) + ++ [fieldHeader 2 compactStruct] + ++ field 1 compactI32 (zigZag32 numValues) + ++ field 1 compactI32 (zigZag32 0) + ++ field 1 compactI32 (zigZag32 0) + ++ field 1 compactI32 (zigZag32 0) + ++ [0x00, 0x00] + ) + <> compressedPayload + +encodePlainInt32Payload :: [Int32] -> BS.ByteString +encodePlainInt32Payload = + BS.concat + . map (word32ToLittleEndian . fromIntegral) + +field :: Word8 -> Word8 -> [Word8] -> [Word8] +field delta encodedType contents = fieldHeader delta encodedType : contents + +fieldHeader :: Word8 -> Word8 -> Word8 +fieldHeader delta encodedType = (delta `shiftL` 4) .|. encodedType + +zigZag32 :: Int32 -> [Word8] +zigZag32 n = + encodeVarInt + (fromIntegral (((fromIntegral n :: Word32) `shiftL` 1) `xor` fromIntegral (n `shiftR` 31))) + +encodeVarInt :: Word64 -> [Word8] +encodeVarInt n + | n < 0x80 = [fromIntegral n] + | otherwise = + fromIntegral ((n .&. 0x7F) .|. 0x80) : encodeVarInt (n `shiftR` 7) diff --git a/tests/data/README.md b/tests/data/README.md index 912f3ef2..58fb1caf 100644 --- a/tests/data/README.md +++ b/tests/data/README.md @@ -241,6 +241,11 @@ It is meant to exercise reading of structured data where each value is smaller than 2GB but the combined uncompressed column chunk size is greater than 2GB. +The repo keeps this fixture to verify BROTLI metadata/plumbing for a +pathological structured column chunk. End-to-end tests should avoid +materializing the full values, since doing so turns the regression into +a memory stress test rather than a codec test. + ## Float16 Files The files `float16_zeros_and_nans.parquet` and `float16_nonzeros_and_nans.parquet` From e8fb7eb617fdde8646e84d1e89e6287268f36b89 Mon Sep 17 00:00:00 2001 From: Anamika AggarwaL Date: Thu, 16 Apr 2026 00:52:51 +0530 Subject: [PATCH 2/4] style: format Brotli parquet changes --- src/DataFrame/IO/Parquet/Page.hs | 2 +- tests/Parquet.hs | 11 +++++++++-- tests/ParquetTestHelpers.hs | 6 ++++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/DataFrame/IO/Parquet/Page.hs b/src/DataFrame/IO/Parquet/Page.hs index 27d8c00b..68f575ce 100644 --- a/src/DataFrame/IO/Parquet/Page.hs +++ b/src/DataFrame/IO/Parquet/Page.hs @@ -2,7 +2,6 @@ module DataFrame.IO.Parquet.Page where -import qualified DataFrame.IO.Parquet.Brotli as Brotli import qualified Codec.Compression.GZip as GZip import qualified Codec.Compression.Zstd.Streaming as Zstd import Data.Bits @@ -12,6 +11,7 @@ import Data.Int import Data.Maybe (fromMaybe) import qualified Data.Vector.Unboxed as VU import DataFrame.IO.Parquet.Binary +import qualified DataFrame.IO.Parquet.Brotli as Brotli import DataFrame.IO.Parquet.Thrift import DataFrame.IO.Parquet.Types import DataFrame.Internal.Binary ( diff --git a/tests/Parquet.hs b/tests/Parquet.hs index ee9266e4..61044d95 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -9,7 +9,11 @@ import qualified DataFrame as D import qualified DataFrame.Functions as F import qualified DataFrame.IO.Parquet as DP import ParquetTestData (allTypes, mtCarsDataset, tinyPagesLast10, transactions) -import ParquetTestHelpers (assertFirstColumnCodec, buildDataPageV1, encodePlainInt32Payload) +import ParquetTestHelpers ( + assertFirstColumnCodec, + buildDataPageV1, + encodePlainInt32Payload, + ) import qualified Data.ByteString as BS import Data.Int @@ -453,7 +457,10 @@ brotliPageReader = case maybePage of Nothing -> assertFailure "brotliPageReader: expected a decoded page" Just page -> do - assertEqual "brotliPageReader page type" DATA_PAGE (pageHeaderPageType (pageHeader page)) + assertEqual + "brotliPageReader page type" + DATA_PAGE + (pageHeaderPageType (pageHeader page)) case pageTypeHeader (pageHeader page) of DataPageHeader{dataPageHeaderNumValues = numValues} -> assertEqual "brotliPageReader num values" 3 numValues diff --git a/tests/ParquetTestHelpers.hs b/tests/ParquetTestHelpers.hs index dab598da..0fe0ce02 100644 --- a/tests/ParquetTestHelpers.hs +++ b/tests/ParquetTestHelpers.hs @@ -4,7 +4,7 @@ module ParquetTestHelpers ( encodePlainInt32Payload, ) where -import Data.Bits ((.&.), (.|.), shiftL, shiftR, xor) +import Data.Bits (shiftL, shiftR, xor, (.&.), (.|.)) import qualified Data.ByteString as BS import Data.Int import Data.Word @@ -62,7 +62,9 @@ fieldHeader delta encodedType = (delta `shiftL` 4) .|. encodedType zigZag32 :: Int32 -> [Word8] zigZag32 n = encodeVarInt - (fromIntegral (((fromIntegral n :: Word32) `shiftL` 1) `xor` fromIntegral (n `shiftR` 31))) + ( fromIntegral + (((fromIntegral n :: Word32) `shiftL` 1) `xor` fromIntegral (n `shiftR` 31)) + ) encodeVarInt :: Word64 -> [Word8] encodeVarInt n From fb982b238f5fa5b5e242c0f2a17c86cedd9d581e Mon Sep 17 00:00:00 2001 From: Anamika AggarwaL Date: Thu, 16 Apr 2026 01:15:33 +0530 Subject: [PATCH 3/4] fix: add Brotli deps to examples package --- examples/examples.cabal | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/examples.cabal b/examples/examples.cabal index d521a262..e86c7d3d 100644 --- a/examples/examples.cabal +++ b/examples/examples.cabal @@ -61,6 +61,7 @@ executable examples DataFrame.IO.JSON, DataFrame.IO.Parquet, DataFrame.IO.Parquet.Binary, + DataFrame.IO.Parquet.Brotli, DataFrame.IO.Parquet.Dictionary, DataFrame.IO.Parquet.Levels, DataFrame.IO.Parquet.Thrift, @@ -133,6 +134,8 @@ executable examples stm >= 2.5 && < 3, filepath >= 1.4 && < 2, Glob >= 0.10 && < 1, + if !os(windows) + build-depends: unix >= 2 && < 3 if impl(ghc >= 9.12) build-depends: ghc-typelits-natnormalise == 0.9.3 else From f13a60baba91ac655666ece105132e8cef53bb15 Mon Sep 17 00:00:00 2001 From: Anamika AggarwaL Date: Sat, 18 Apr 2026 09:58:02 +0530 Subject: [PATCH 4/4] chore: Import Word32 and Word64 types in Parquet test file --- tests/Parquet.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/Parquet.hs b/tests/Parquet.hs index 61044d95..6716d6db 100644 --- a/tests/Parquet.hs +++ b/tests/Parquet.hs @@ -20,6 +20,7 @@ import Data.Int import qualified Data.Set as S import qualified Data.Text as T import qualified Data.Vector.Unboxed as VU +import Data.Word (Word32, Word64) import DataFrame.IO.Parquet.Page (readNInt32Vec, readPage) import DataFrame.IO.Parquet.Thrift ( columnMetaData,