Skip to content

feat: support Spark expression json_array_length#4365

Open
kazantsev-maksim wants to merge 57 commits into
apache:mainfrom
kazantsev-maksim:json_array_length
Open

feat: support Spark expression json_array_length#4365
kazantsev-maksim wants to merge 57 commits into
apache:mainfrom
kazantsev-maksim:json_array_length

Conversation

@kazantsev-maksim
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

Add native support for Spark's json_array_length(jsonArray) expression so it runs on Comet instead of falling back to Spark.

What changes are included in this PR?

  • native/spark-expr/src/json_funcs/json_array_length.rs: new JsonArrayLength UDF (json_array_length) returns the number of elements in the outermost JSON array.
  • native/spark-expr/src/comet_scalar_funcs.rs: register the new UDF.
  • spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala: register function in jsonExpressions.

How are these changes tested?

New SQL test file spark/src/test/resources/sql-tests/expressions/json/json_array_length.sql

@@ -249,6 +249,9 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Cast] -> CometCast)

private val jsonExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[LengthOfJsonArray] -> CometScalarFunction("json_array_length"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark's JsonExpressionUtils.lengthOfJsonArray uses a Jackson parser configured with
ALLOW_SINGLE_QUOTES and ALLOW_UNESCAPED_CONTROL_CHARS, and uses streaming parsing that ignores trailing content after the array's closing bracket. serde_json::from_str::<Value> is much stricter. Could you mark getSupportLevel as Incompatible(Some("...")) and add getIncompatibleReasons() so that the auto-generated compat doc surfaces the limitation to users, and the function is gated behind spark.comet.expression.LengthOfJsonArray.allowIncompatible=true.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks!

SELECT json_array_length('not an array')

query
SELECT json_array_length('{"key":"value"}')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add examples for incompatible behavior, such as using single quotes around keys and values

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


fn spark_json_array_length_array(array: &ArrayRef) -> Result<ArrayRef> {
match array.data_type() {
DataType::Utf8 => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle other variants like LargeUtf8?

}

fn get_json_array_length(json_str: &str) -> Option<i32> {
match serde_json::from_str::<serde_json::Value>(json_str) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is parsing and materializing the whole array just to get the length. Spark avoids this with a streaming parser, so our performance here may not be great, and we will likely use more memory than Spark.

Maybe you could look into streaming options with serde_json?

object CometLengthOfJsonArray
extends CometScalarFunction[LengthOfJsonArray]("json_array_length") {

private val IncompatibleReason: String =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.LengthOfJsonArray.allowIncompatible=false
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be setting it to true?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants