Skip to content

Commit c082326

Browse files
authored
fix(table): writing map column with multiple entries in a row (#598)
fixes #595 Turns out we were incorrectly constructing the Map column inside of `ToRequestedSchema`. I also discovered a potential memory leak while testing this and fixed that with an added `defer batch.Release()` to correctly handle references.
1 parent 325414a commit c082326

File tree

3 files changed

+88
-1
lines changed

3 files changed

+88
-1
lines changed

table/arrow_utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ func (a *arrowProjectionVisitor) Map(m iceberg.MapType, mapArray, keyResult, val
896896
valField := a.constructField(m.ValueField(), vals.DataType())
897897

898898
mapType := arrow.MapOfWithMetadata(keyField.Type, keyField.Metadata, valField.Type, valField.Metadata)
899-
childData := array.NewData(mapType.Elem(), arr.Len(), []*memory.Buffer{nil},
899+
childData := array.NewData(mapType.Elem(), arr.Data().Children()[0].Len(), []*memory.Buffer{nil},
900900
[]arrow.ArrayData{keys.Data(), vals.Data()}, 0, 0)
901901
defer childData.Release()
902902
newData := array.NewData(mapType, arr.Len(), arr.Data().Buffers(),

table/table_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434

3535
"github.com/apache/arrow-go/v18/arrow"
3636
"github.com/apache/arrow-go/v18/arrow/array"
37+
"github.com/apache/arrow-go/v18/arrow/compute"
3738
"github.com/apache/arrow-go/v18/arrow/memory"
3839
"github.com/apache/arrow-go/v18/parquet/pqarrow"
3940
"github.com/apache/iceberg-go"
@@ -1473,6 +1474,91 @@ func (t *TableWritingTestSuite) TestDeleteOldMetadataNoErrorLogsOnFileFound() {
14731474
t.NotContains(logOutput, "no such file or directory")
14741475
}
14751476

1477+
// testing issue reported in https://github.com/apache/iceberg-go/issues/595
1478+
func TestWriteMapType(t *testing.T) {
1479+
loc := filepath.ToSlash(t.TempDir())
1480+
1481+
cat, err := catalog.Load(context.Background(), "default", iceberg.Properties{
1482+
"uri": ":memory:",
1483+
"type": "sql",
1484+
sql.DriverKey: sqliteshim.ShimName,
1485+
sql.DialectKey: string(sql.SQLite),
1486+
"warehouse": "file://" + loc,
1487+
})
1488+
require.NoError(t, err)
1489+
1490+
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
1491+
defer mem.AssertSize(t, 0)
1492+
1493+
ctx := compute.WithAllocator(context.Background(), mem)
1494+
cat.CreateNamespace(ctx, catalog.ToIdentifier("default"), nil)
1495+
iceSch := iceberg.NewSchema(1,
1496+
iceberg.NestedField{
1497+
ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.String, Required: true,
1498+
},
1499+
iceberg.NestedField{
1500+
ID: 2, Name: "attrs", Required: true, Type: &iceberg.MapType{
1501+
KeyID: 3,
1502+
KeyType: iceberg.PrimitiveTypes.String,
1503+
ValueID: 4,
1504+
ValueType: iceberg.PrimitiveTypes.String,
1505+
ValueRequired: false,
1506+
},
1507+
})
1508+
1509+
ident := catalog.ToIdentifier("default", "repro_map")
1510+
tbl, err := cat.CreateTable(ctx, ident, iceSch, catalog.WithLocation(loc))
1511+
require.NoError(t, err)
1512+
1513+
arrowSch, err := table.SchemaToArrowSchema(iceSch, nil, true, false)
1514+
require.NoError(t, err)
1515+
1516+
bldr := array.NewRecordBuilder(mem, arrowSch)
1517+
defer bldr.Release()
1518+
1519+
idbldr := bldr.Field(0).(*array.StringBuilder)
1520+
attrBldr := bldr.Field(1).(*array.MapBuilder)
1521+
attrKeyBldr := attrBldr.KeyBuilder().(*array.StringBuilder)
1522+
attrItemBldr := attrBldr.ItemBuilder().(*array.StringBuilder)
1523+
1524+
idbldr.Append("row-0")
1525+
attrBldr.Append(true)
1526+
attrKeyBldr.Append("a")
1527+
attrItemBldr.Append("1")
1528+
1529+
idbldr.Append("row-1")
1530+
attrBldr.Append(true)
1531+
attrKeyBldr.AppendValues([]string{"b", "c"}, nil)
1532+
attrItemBldr.AppendValues([]string{"2", "3"}, nil)
1533+
1534+
rec := bldr.NewRecordBatch()
1535+
defer rec.Release()
1536+
1537+
rr, err := array.NewRecordReader(arrowSch, []arrow.RecordBatch{rec})
1538+
require.NoError(t, err)
1539+
defer rr.Release()
1540+
1541+
result, err := tbl.Append(ctx, rr, nil)
1542+
require.NoError(t, err)
1543+
1544+
resultTbl, err := result.Scan().ToArrowTable(ctx)
1545+
require.NoError(t, err)
1546+
defer resultTbl.Release()
1547+
1548+
expectedSchema, err := table.SchemaToArrowSchema(iceSch, nil, false, false)
1549+
require.NoError(t, err)
1550+
expected, err := array.TableFromJSON(mem, expectedSchema, []string{
1551+
`[
1552+
{"id": "row-0", "attrs": [{"key": "a", "value": "1"}]},
1553+
{"id": "row-1", "attrs": [{"key": "b", "value": "2"}, {"key": "c", "value": "3"}]}
1554+
]`,
1555+
})
1556+
require.NoError(t, err)
1557+
defer expected.Release()
1558+
1559+
require.True(t, array.TableEqual(expected, resultTbl), "expected:\n %s\ngot:\n %s", expected, resultTbl)
1560+
}
1561+
14761562
func (t *TableTestSuite) TestRefresh() {
14771563
cat, err := catalog.Load(context.Background(), "default", iceberg.Properties{
14781564
"uri": ":memory:",

table/writer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func (w *writer) writeFile(ctx context.Context, partitionValues map[int]any, tas
6868
return nil, err
6969
}
7070
batches[i] = rec
71+
defer rec.Release()
7172
}
7273

7374
statsCols, err := computeStatsPlan(w.fileSchema, w.meta.props)

0 commit comments

Comments
 (0)