Skip to content

Commit 8270498

Browse files
authored
fix: Writing Avro for Spark (#435)
Turns out that the Java Iceberg implementation *requires* the Avro field names to match exactly instead of only relying on the field-ids/element-ids. In this case, the issue is the Partition Field Summary element records. It appears that all "struct" types in iceberg java are expected to be named ["r" + fieldID](https://github.com/apache/iceberg/blob/a7f3dc79a2f42a4875ac35eec2137ecff15204fc/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java#L100-L105) and if you don't name it appropriately, Spark throws a very unhelpful exception (`ArrayStoreException`). This was causing anything best on Java Iceberg to fail when reading Avro written by Iceberg-Go. fixes #434
1 parent f381393 commit 8270498

File tree

8 files changed

+177
-16
lines changed

8 files changed

+177
-16
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ require (
4949
github.com/uptrace/bun/extra/bundebug v1.2.11
5050
gocloud.dev v0.41.0
5151
golang.org/x/sync v0.14.0
52-
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da
5352
google.golang.org/api v0.233.0
5453
gopkg.in/yaml.v3 v3.0.1
5554
)
@@ -306,6 +305,7 @@ require (
306305
golang.org/x/text v0.25.0 // indirect
307306
golang.org/x/time v0.11.0 // indirect
308307
golang.org/x/tools v0.32.0 // indirect
308+
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
309309
google.golang.org/genproto v0.0.0-20250324211829-b45e905df463 // indirect
310310
google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 // indirect
311311
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect

internal/avro_schemas.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func WithElementID(id int) avro.SchemaOption {
9292
}
9393

9494
func init() {
95-
AvroSchemaCache.Add("field_summary", Must(avro.NewRecordSchema("field_summary", "", []*avro.Field{
95+
AvroSchemaCache.Add("field_summary", Must(avro.NewRecordSchema("r508", "", []*avro.Field{
9696
Must(avro.NewField("contains_null",
9797
BoolSchema,
9898
avro.WithDoc("true if the field contains null values"),

internal/recipe/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ RUN pip3 install pyiceberg[s3fs,hive]
2020
RUN pip3 install pyarrow
2121

2222
COPY provision.py .
23+
COPY run_spark_count_sql.py .
2324

2425
ENTRYPOINT ["./entrypoint.sh"]
2526
CMD ["notebook"]

internal/recipe/local_spark.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,42 +25,40 @@ import (
2525
"os"
2626
"testing"
2727

28-
"golang.org/x/xerrors"
29-
3028
"github.com/testcontainers/testcontainers-go/modules/compose"
3129
)
3230

3331
//go:embed docker-compose.yml
3432
var composeFile []byte
3533

36-
func Start(t *testing.T) error {
34+
func Start(t *testing.T) (*compose.DockerCompose, error) {
3735
if _, ok := os.LookupEnv("AWS_S3_ENDPOINT"); ok {
38-
return nil
36+
return nil, nil
3937
}
4038
stack, err := compose.NewDockerComposeWith(
4139
compose.WithStackReaders(bytes.NewBuffer(composeFile)),
4240
)
4341
if err != nil {
44-
return xerrors.Errorf("fail to start compose: %w", err)
42+
return nil, fmt.Errorf("fail to start compose: %w", err)
4543
}
4644
if err := stack.Up(t.Context()); err != nil {
47-
return xerrors.Errorf("fail to up compose: %w", err)
45+
return stack, fmt.Errorf("fail to up compose: %w", err)
4846
}
4947
spark, err := stack.ServiceContainer(t.Context(), "spark-iceberg")
5048
if err != nil {
51-
return xerrors.Errorf("fail to find spark-iceberg: %w", err)
49+
return stack, fmt.Errorf("fail to find spark-iceberg: %w", err)
5250
}
5351
_, stdout, err := spark.Exec(t.Context(), []string{"ipython", "./provision.py"})
5452
if err != nil {
55-
return xerrors.Errorf("fail to seed provision.py: %w", err)
53+
return stack, fmt.Errorf("fail to seed provision.py: %w", err)
5654
}
5755
data, err := io.ReadAll(stdout)
5856
if err != nil {
59-
return xerrors.Errorf("fail to read stdout: %w", err)
57+
return stack, fmt.Errorf("fail to read stdout: %w", err)
6058
}
6159
fmt.Println(string(data))
6260
t.Setenv("AWS_S3_ENDPOINT", "http://localhost:9000")
6361
t.Setenv("AWS_REGION", "us-east-1")
6462

65-
return nil
63+
return stack, nil
6664
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from pyspark.sql import SparkSession
19+
20+
spark = SparkSession.builder.getOrCreate()
21+
22+
spark.sql("SELECT COUNT(*) FROM default.test_partitioned_by_days").show()

io/s3_integration_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ package io_test
2121

2222
import (
2323
"context"
24-
"github.com/apache/iceberg-go/internal/recipe"
2524
"testing"
2625

26+
"github.com/apache/iceberg-go/internal/recipe"
27+
2728
"github.com/apache/iceberg-go"
2829
"github.com/apache/iceberg-go/catalog"
2930
sqlcat "github.com/apache/iceberg-go/catalog/sql"
@@ -33,7 +34,8 @@ import (
3334
)
3435

3536
func TestMinioWarehouse(t *testing.T) {
36-
require.NoError(t, recipe.Start(t))
37+
_, err := recipe.Start(t)
38+
require.NoError(t, err)
3739

3840
cat, err := catalog.Load(context.Background(), "default", iceberg.Properties{
3941
"uri": ":memory:",
@@ -64,7 +66,8 @@ func TestMinioWarehouse(t *testing.T) {
6466
}
6567

6668
func TestMinioWarehouseNoLocation(t *testing.T) {
67-
require.NoError(t, recipe.Start(t))
69+
_, err := recipe.Start(t)
70+
require.NoError(t, err)
6871

6972
cat, err := catalog.Load(context.Background(), "default", iceberg.Properties{
7073
"uri": ":memory:",

table/scanner_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ type ScannerSuite struct {
5252
}
5353

5454
func (s *ScannerSuite) SetupSuite() {
55-
require.NoError(s.T(), recipe.Start(s.T()))
55+
_, err := recipe.Start(s.T())
56+
require.NoError(s.T(), err)
5657
}
5758

5859
func (s *ScannerSuite) SetupTest() {

table/transaction_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//go:build integration
19+
20+
package table_test
21+
22+
import (
23+
"context"
24+
"io"
25+
"strings"
26+
"testing"
27+
"time"
28+
29+
"github.com/apache/arrow-go/v18/arrow"
30+
"github.com/apache/arrow-go/v18/arrow/array"
31+
"github.com/apache/arrow-go/v18/arrow/memory"
32+
"github.com/apache/arrow-go/v18/parquet"
33+
"github.com/apache/arrow-go/v18/parquet/pqarrow"
34+
"github.com/apache/iceberg-go"
35+
"github.com/apache/iceberg-go/catalog"
36+
"github.com/apache/iceberg-go/catalog/rest"
37+
"github.com/apache/iceberg-go/internal/recipe"
38+
iceio "github.com/apache/iceberg-go/io"
39+
"github.com/apache/iceberg-go/table"
40+
"github.com/stretchr/testify/suite"
41+
"github.com/testcontainers/testcontainers-go/modules/compose"
42+
)
43+
44+
type SparkIntegrationTestSuite struct {
45+
suite.Suite
46+
47+
ctx context.Context
48+
cat catalog.Catalog
49+
props iceberg.Properties
50+
stack *compose.DockerCompose
51+
}
52+
53+
func (s *SparkIntegrationTestSuite) SetupSuite() {
54+
var err error
55+
s.stack, err = recipe.Start(s.T())
56+
s.Require().NoError(err)
57+
if s.stack == nil {
58+
s.T().Skip("skipping test, AWS_S3_ENDPOINT is set")
59+
}
60+
}
61+
62+
func (s *SparkIntegrationTestSuite) SetupTest() {
63+
s.ctx = context.Background()
64+
s.props = iceberg.Properties{
65+
iceio.S3Region: "us-east-1",
66+
iceio.S3EndpointURL: "http://localhost:9000",
67+
iceio.S3AccessKeyID: "admin",
68+
iceio.S3SecretAccessKey: "password",
69+
}
70+
71+
cat, err := rest.NewCatalog(s.ctx, "rest", "http://localhost:8181", rest.WithAdditionalProps(s.props))
72+
s.Require().NoError(err)
73+
s.cat = cat
74+
}
75+
76+
func (s *SparkIntegrationTestSuite) TestAddFile() {
77+
const filename = "s3://warehouse/default/test_partitioned_by_days/data/ts_day=2023-03-13/supertest.parquet"
78+
79+
tbl, err := s.cat.LoadTable(s.ctx, catalog.ToIdentifier("default", "test_partitioned_by_days"), nil)
80+
s.Require().NoError(err)
81+
82+
sc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false)
83+
if err != nil {
84+
panic(err)
85+
}
86+
87+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
88+
defer bldr.Release()
89+
90+
tm := time.Date(2023, 03, 13, 13, 22, 0, 0, time.UTC)
91+
ts, _ := arrow.TimestampFromTime(tm, arrow.Microsecond)
92+
bldr.Field(0).(*array.Date32Builder).Append(arrow.Date32FromTime(tm))
93+
bldr.Field(1).(*array.TimestampBuilder).Append(ts)
94+
bldr.Field(2).(*array.Int32Builder).Append(13)
95+
bldr.Field(3).(*array.StringBuilder).Append("m")
96+
97+
rec := bldr.NewRecord()
98+
defer rec.Release()
99+
100+
fw, err := tbl.FS().(iceio.WriteFileIO).Create(filename)
101+
if err != nil {
102+
panic(err)
103+
}
104+
defer fw.Close()
105+
106+
if err := pqarrow.WriteTable(array.NewTableFromRecords(sc, []arrow.Record{rec}), fw, rec.NumRows(), parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()); err != nil {
107+
panic(err)
108+
}
109+
110+
tx := tbl.NewTransaction()
111+
err = tx.AddFiles(s.ctx, []string{filename}, nil, false)
112+
s.Require().NoError(err)
113+
114+
tbl, err = tx.Commit(s.ctx)
115+
s.Require().NoError(err)
116+
117+
spark, err := s.stack.ServiceContainer(s.T().Context(), "spark-iceberg")
118+
s.Require().NoError(err)
119+
120+
_, stdout, err := spark.Exec(s.ctx, []string{"ipython", "./run_spark_count_sql.py"})
121+
s.Require().NoError(err)
122+
123+
output, err := io.ReadAll(stdout)
124+
s.Require().NoError(err)
125+
strings.HasSuffix(string(output), `
126+
+--------+
127+
|count(1)|
128+
+--------+
129+
| 13|
130+
+--------+
131+
`)
132+
}
133+
134+
func TestSparkIntegration(t *testing.T) {
135+
suite.Run(t, new(SparkIntegrationTestSuite))
136+
}

0 commit comments

Comments
 (0)