Skip to content

Commit 7e7a658

Browse files
authored
GH-2836: Support reading pure parquet files with cat (#3332)
1 parent d5df847 commit 7e7a658

File tree

2 files changed

+138
-29
lines changed

2 files changed

+138
-29
lines changed

parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java

Lines changed: 80 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
import com.google.common.io.Closeables;
2828
import java.io.Closeable;
2929
import java.io.IOException;
30-
import java.util.HashMap;
3130
import java.util.List;
32-
import java.util.Map;
3331
import org.apache.avro.Schema;
32+
import org.apache.avro.SchemaParseException;
3433
import org.apache.parquet.cli.BaseCommand;
3534
import org.apache.parquet.cli.util.Expressions;
35+
import org.apache.parquet.example.data.Group;
36+
import org.apache.parquet.hadoop.ParquetReader;
37+
import org.apache.parquet.hadoop.example.GroupReadSupport;
3638
import org.slf4j.Logger;
3739

3840
@Parameters(commandDescription = "Print the first N records from a file")
@@ -60,41 +62,90 @@ public CatCommand(Logger console, long defaultNumRecords) {
6062
public int run() throws IOException {
6163
Preconditions.checkArgument(sourceFiles != null && !sourceFiles.isEmpty(), "Missing file name");
6264

63-
// Ensure all source files have the columns specified first
64-
Map<String, Schema> schemas = new HashMap<>();
6565
for (String source : sourceFiles) {
66-
Schema schema = getAvroSchema(source);
67-
schemas.put(source, Expressions.filterSchema(schema, columns));
66+
try {
67+
runWithAvroSchema(source);
68+
} catch (SchemaParseException e) {
69+
console.debug(
70+
"Avro schema conversion failed for {}, falling back to Group reader: {}",
71+
source,
72+
e.getMessage());
73+
runWithGroupReader(source);
74+
}
6875
}
6976

70-
for (String source : sourceFiles) {
71-
Schema projection = schemas.get(source);
72-
Iterable<Object> reader = openDataFile(source, projection);
73-
boolean threw = true;
74-
long count = 0;
75-
try {
76-
for (Object record : reader) {
77-
if (numRecords > 0 && count >= numRecords) {
78-
break;
79-
}
80-
if (columns == null || columns.size() != 1) {
81-
console.info(String.valueOf(record));
82-
} else {
83-
console.info(String.valueOf(select(projection, record, columns.get(0))));
84-
}
85-
count += 1;
77+
return 0;
78+
}
79+
80+
private void runWithAvroSchema(String source) throws IOException {
81+
Schema schema = getAvroSchema(source);
82+
Schema projection = Expressions.filterSchema(schema, columns);
83+
84+
Iterable<Object> reader = openDataFile(source, projection);
85+
boolean threw = true;
86+
long count = 0;
87+
try {
88+
for (Object record : reader) {
89+
if (numRecords > 0 && count >= numRecords) {
90+
break;
8691
}
87-
threw = false;
88-
} catch (RuntimeException e) {
89-
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
90-
} finally {
91-
if (reader instanceof Closeable) {
92-
Closeables.close((Closeable) reader, threw);
92+
if (columns == null || columns.size() != 1) {
93+
console.info(String.valueOf(record));
94+
} else {
95+
console.info(String.valueOf(select(projection, record, columns.get(0))));
9396
}
97+
count += 1;
98+
}
99+
threw = false;
100+
} catch (RuntimeException e) {
101+
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
102+
} finally {
103+
if (reader instanceof Closeable) {
104+
Closeables.close((Closeable) reader, threw);
94105
}
95106
}
107+
}
96108

97-
return 0;
109+
private void runWithGroupReader(String source) throws IOException {
110+
ParquetReader<Group> reader = ParquetReader.<Group>builder(new GroupReadSupport(), qualifiedPath(source))
111+
.withConf(getConf())
112+
.build();
113+
114+
boolean threw = true;
115+
long count = 0;
116+
try {
117+
for (Group record = reader.read(); record != null; record = reader.read()) {
118+
if (numRecords > 0 && count >= numRecords) {
119+
break;
120+
}
121+
122+
if (columns == null) {
123+
console.info(record.toString());
124+
} else {
125+
StringBuilder sb = new StringBuilder();
126+
for (int i = 0; i < columns.size(); i++) {
127+
String columnName = columns.get(i);
128+
try {
129+
Object value =
130+
record.getValueToString(record.getType().getFieldIndex(columnName), 0);
131+
if (i > 0) sb.append(", ");
132+
sb.append(columnName).append(": ").append(value);
133+
} catch (Exception e) {
134+
console.warn("Column '{}' not found in file {}", columnName, source);
135+
}
136+
}
137+
if (sb.length() > 0) {
138+
console.info(sb.toString());
139+
}
140+
}
141+
count += 1;
142+
}
143+
threw = false;
144+
} catch (RuntimeException e) {
145+
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
146+
} finally {
147+
Closeables.close(reader, threw);
148+
}
98149
}
99150

100151
@Override

parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,15 @@
2424
import java.util.Arrays;
2525
import org.apache.hadoop.conf.Configuration;
2626
import org.apache.hadoop.fs.Path;
27+
import org.apache.parquet.example.data.Group;
28+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
29+
import org.apache.parquet.hadoop.ParquetWriter;
30+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
2731
import org.apache.parquet.proto.ProtoParquetWriter;
2832
import org.apache.parquet.proto.test.TestProtobuf;
33+
import org.apache.parquet.schema.MessageType;
34+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
35+
import org.apache.parquet.schema.Types;
2936
import org.junit.Assert;
3037
import org.junit.Test;
3138

@@ -96,6 +103,57 @@ public void testCatCommandWithSimpleReaderConfig() throws Exception {
96103
Assert.assertEquals(0, result);
97104
}
98105

106+
@Test
107+
public void testCatCommandWithHyphenatedFieldNames() throws Exception {
108+
File hyphenFile = new File(getTempFolder(), "hyphenated_fields.parquet");
109+
writeParquetWithHyphenatedFields(hyphenFile);
110+
111+
CatCommand cmd = new CatCommand(createLogger(), 1);
112+
cmd.sourceFiles = Arrays.asList(hyphenFile.getAbsolutePath());
113+
cmd.setConf(new Configuration());
114+
115+
int result = cmd.run();
116+
Assert.assertEquals(0, result);
117+
}
118+
119+
private static void writeParquetWithHyphenatedFields(File file) throws IOException {
120+
MessageType schema = Types.buildMessage()
121+
.required(PrimitiveTypeName.INT32)
122+
.named("order_id")
123+
.required(PrimitiveTypeName.BINARY)
124+
.named("customer-name")
125+
.required(PrimitiveTypeName.BINARY)
126+
.named("product-category")
127+
.required(PrimitiveTypeName.DOUBLE)
128+
.named("sale-amount")
129+
.required(PrimitiveTypeName.BINARY)
130+
.named("region")
131+
.named("SalesRecord");
132+
133+
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
134+
135+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new Path(file.getAbsolutePath()))
136+
.withType(schema)
137+
.build()) {
138+
139+
Group record1 = factory.newGroup()
140+
.append("order_id", 1001)
141+
.append("customer-name", "John Smith")
142+
.append("product-category", "Electronics")
143+
.append("sale-amount", 299.99)
144+
.append("region", "North");
145+
writer.write(record1);
146+
147+
Group record2 = factory.newGroup()
148+
.append("order_id", 1002)
149+
.append("customer-name", "Jane Doe")
150+
.append("product-category", "Home-Garden")
151+
.append("sale-amount", 149.50)
152+
.append("region", "South");
153+
writer.write(record2);
154+
}
155+
}
156+
99157
private static void writeProtoParquet(File file) throws Exception {
100158
TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder()
101159
.addRepeatedInt(1)

0 commit comments

Comments
 (0)