diff --git a/BigQuery/src/BigQueryClient.php b/BigQuery/src/BigQueryClient.php index df14869c75f3..ab816ed3fee2 100644 --- a/BigQuery/src/BigQueryClient.php +++ b/BigQuery/src/BigQueryClient.php @@ -417,10 +417,44 @@ public function runQuery(JobConfigurationInterface $query, array $options = []) ], $options); $queryResultsOptions['initialTimeoutMs'] = 10000; - $queryResults = $this->startQuery( - $query, - $options - )->queryResults($queryResultsOptions + $options); + if ($query instanceof QueryJobConfiguration && $query->isStateless()) { + $queryRequest = $query->toQueryRequest(); + + // The flattened notation does not work for POST request without special handling. + // Propagate it for backwards compatibility. + if (isset($queryResultsOptions['formatOptions.useInt64Timestamp'])) { + $useInt64 = $this->pluck('formatOptions.useInt64Timestamp', $queryResultsOptions, false); + $queryResultsOptions['formatOptions']['useInt64Timestamp'] = $useInt64; + } + + $statelessArgs = $queryRequest + $queryResultsOptions + [ + 'projectId' => $this->projectId + ] + $options; + + if (!isset($statelessArgs['timeoutMs'])) { + $statelessArgs['timeoutMs'] = $statelessArgs['initialTimeoutMs']; + } + + $response = $this->connection->query($statelessArgs); + + if ($response['jobComplete'] ?? false) { + return new QueryResults( + $this->connection, + '', + $this->projectId, + $response, + $this->mapper, + $this->createJob([], ''), // create an empty job + $queryResultsOptions + ); + } + + $job = $this->createJob($response, $response['jobReference']['jobId']); + } else { + $job = $this->startQuery($query, $options); + } + + $queryResults = $job->queryResults($queryResultsOptions + $options); $queryResults->waitUntilComplete(); return $queryResults; } @@ -772,12 +806,17 @@ public function startJob(JobConfigurationInterface $config, array $options = []) $response = $this->connection->insertJob($config); } + return $this->createJob($response, $config['jobReference']['jobId']); + } + + private function createJob(array $info, string $jobId) + { return new Job( $this->connection, - $config['jobReference']['jobId'], + $jobId, $this->projectId, $this->mapper, - $response + $info ); } diff --git a/BigQuery/src/QueryJobConfiguration.php b/BigQuery/src/QueryJobConfiguration.php index 40e1739a2480..59d25135fff0 100644 --- a/BigQuery/src/QueryJobConfiguration.php +++ b/BigQuery/src/QueryJobConfiguration.php @@ -37,10 +37,13 @@ class QueryJobConfiguration implements JobConfigurationInterface { use JobConfigurationTrait; + private const JOB_CREATION_MODE_OPTIONAL = 'JOB_CREATION_OPTIONAL'; + /** * @var ValueMapper Maps values between PHP and BigQuery. */ private $mapper; + private bool $isJobIdGenerated = false; /** * @param ValueMapper $mapper Maps values between PHP and BigQuery. @@ -58,6 +61,13 @@ public function __construct( $this->mapper = $mapper; $this->jobConfigurationProperties($projectId, $config, $location); + if (!isset($config['jobReference']['jobId'])) { + // If the user did not submit a jobId to the configuration array, the library will create a JobId before + // the request is sent. This is used to keep track if it is the user who set the JobId or the library + /// for stateless queries. + $this->isJobIdGenerated = true; + } + if (!isset($this->config['configuration']['query']['useLegacySql'])) { $this->config['configuration']['query']['useLegacySql'] = false; } @@ -593,4 +603,89 @@ public function writeDisposition($writeDisposition) return $this; } + + /** + * Returns true if the current configuration is compatible with the stateless query API. + * False if not + * + * @internal + * @return bool + */ + public function isStateless(): bool + { + $config = $this->config; + $queryConfig = $config['configuration']['query']; + + if (isset($queryConfig['destinationTable']) || + isset($queryConfig['tableDefinitions']) || + isset($queryConfig['createDisposition']) || + isset($queryConfig['writeDisposition']) || + ($queryConfig['useLegacySql'] ?? false) || + isset($queryConfig['maximumBillingTier']) || + isset($queryConfig['timePartitioning']) || + isset($queryConfig['rangePartitioning']) || + isset($queryConfig['clustering']) || + isset($queryConfig['destinationEncryptionConfiguration']) || + isset($queryConfig['schemaUpdateOptions']) || + isset($queryConfig['jobTimeoutMs']) + ) { + return false; + } + + if (isset($queryConfig['priority']) && $queryConfig['priority'] !== 'INTERACTIVE') { + return false; + } + + if ($config['configuration']['dryRun'] ?? false) { + return false; + } + + // Creating a jobConfiguration from the library sets the JobId always meaning we do not have a way + // to determine if this jobId was set by the user or our library. + // We check if this was autogenerated to circumvent this issue. + if (isset($config['jobReference']['jobId']) && !$this->isJobIdGenerated()) { + return false; + } + + return true; + } + + /** + * Returns an array representation of a QueryRequest: + * [QueryRequest](https://docs.cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest) + * + * @return array + */ + public function toQueryRequest(): array + { + $config = $this->config; + $queryConfig = $config['configuration']['query']; + + return [ + 'query' => $queryConfig['query'], + 'maxResults' => $queryConfig['maxResults'] ?? null, + 'defaultDataset' => $queryConfig['defaultDataset'] ?? null, + 'timeoutMs' => $queryConfig['timeoutMs'] ?? null, + 'useQueryCache' => $queryConfig['useQueryCache'] ?? null, + 'useLegacySql' => false, + 'queryParameters' => $queryConfig['queryParameters'] ?? null, + 'parameterMode' => $queryConfig['parameterMode'] ?? null, + 'labels' => $config['configuration']['labels'] ?? null, + 'createSession' => $queryConfig['createSession'] ?? null, + 'maximumBytesBilled' => $queryConfig['maximumBytesBilled'] ?? null, + 'location' => $config['jobReference']['location'] ?? null, + 'requestId' => $config['jobReference']['jobId'], + 'jobCreationMode' => self::JOB_CREATION_MODE_OPTIONAL + ]; + } + + /** + * Returns if the JobId was generated by the JobConfigurationTrait + * + * @return bool + */ + private function isJobIdGenerated(): bool + { + return $this->isJobIdGenerated; + } } diff --git a/BigQuery/src/QueryResults.php b/BigQuery/src/QueryResults.php index 4da695acfe59..290c5253aa1b 100644 --- a/BigQuery/src/QueryResults.php +++ b/BigQuery/src/QueryResults.php @@ -68,6 +68,7 @@ class QueryResults implements \IteratorAggregate * @var array Default options to be used for calls to get query results. */ private $queryResultsOptions; + private bool $isStateless = false; /** * @param ConnectionInterface $connection Represents a connection to @@ -102,6 +103,7 @@ public function __construct( ? $info['jobReference']['location'] : $job->identity()['location'] ]; + $this->isStateless = empty($jobId); $this->mapper = $mapper; $this->queryResultsOptions = $queryResultsOptions; } @@ -292,6 +294,10 @@ public function info() */ public function reload(array $options = []) { + if ($this->isStateless) { + return $this->info; + } + return $this->info = $this->connection->getQueryResults( $options + $this->identity ); diff --git a/BigQuery/tests/Snippet/BigQueryClientTest.php b/BigQuery/tests/Snippet/BigQueryClientTest.php index 06f189299285..832a33e31348 100644 --- a/BigQuery/tests/Snippet/BigQueryClientTest.php +++ b/BigQuery/tests/Snippet/BigQueryClientTest.php @@ -154,7 +154,7 @@ public function testRunQuery() { $snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery'); $snippet->addLocal('bigQuery', $this->client); - $this->connection->insertJob(Argument::any()) + $this->connection->query(Argument::any()) ->shouldBeCalled() ->willReturn([ 'jobComplete' => false, @@ -179,36 +179,42 @@ public function testRunQueryWithNamedParameters() $expectedQuery = 'SELECT commit FROM `bigquery-public-data.github_repos.commits`' . 'WHERE author.date < @date AND message = @message LIMIT 100'; $this->connection - ->insertJob([ - 'projectId' => self::PROJECT_ID, - 'jobReference' => ['projectId' => self::PROJECT_ID, 'jobId' => self::JOB_ID], - 'configuration' => [ - 'query' => [ - 'parameterMode' => 'named', - 'useLegacySql' => false, - 'queryParameters' => [ - [ - 'name' => 'date', - 'parameterType' => [ - 'type' => 'TIMESTAMP' - ], - 'parameterValue' => [ - 'value' => '1980-01-01 12:15:00.000000+00:00' - ] - ], - [ - 'name' => 'message', - 'parameterType' => [ - 'type' => 'STRING' - ], - 'parameterValue' => [ - 'value' => 'A commit message.' - ] - ] + ->query([ + "query"=> $expectedQuery, + "maxResults"=> null, + "defaultDataset"=> null, + "timeoutMs"=> 10000, + "useQueryCache"=> null, + "useLegacySql"=> false, + "queryParameters"=> [ + [ + "parameterType"=> [ + "type"=> "TIMESTAMP" + ], + "parameterValue"=> [ + "value"=> "1980-01-01 12:15:00.000000+00:00" + ], + "name"=> "date" + ], + [ + "parameterType"=> [ + "type"=> "STRING" + ], + "parameterValue"=> [ + "value"=> "A commit message." ], - 'query' => $expectedQuery + "name"=> "message" ] - ] + ], + "parameterMode"=> "named", + "labels"=> null, + "createSession"=> null, + "maximumBytesBilled"=> null, + "location"=> null, + "requestId"=> "myJobId", + "jobCreationMode"=> "JOB_CREATION_OPTIONAL", + "initialTimeoutMs"=> 10000, + "projectId"=> "my-awesome-project" ]) ->shouldBeCalledTimes(1) ->willReturn([ @@ -233,26 +239,32 @@ public function testRunQueryWithPositionalParameters() $snippet->addLocal('bigQuery', $this->client); $expectedQuery = 'SELECT commit FROM `bigquery-public-data.github_repos.commits` WHERE message = ? LIMIT 100'; $this->connection - ->insertJob([ - 'projectId' => self::PROJECT_ID, - 'jobReference' => ['projectId' => self::PROJECT_ID, 'jobId' => self::JOB_ID], - 'configuration' => [ - 'query' => [ - 'parameterMode' => 'positional', - 'useLegacySql' => false, - 'queryParameters' => [ - [ - 'parameterType' => [ - 'type' => 'STRING' - ], - 'parameterValue' => [ - 'value' => 'A commit message.' - ] - ] + ->query([ + "query"=> "SELECT commit FROM `bigquery-public-data.github_repos.commits` WHERE message = ? LIMIT 100", + "maxResults"=> null, + "defaultDataset"=> null, + "timeoutMs"=> 10000, + "useQueryCache"=> null, + "useLegacySql"=> false, + "queryParameters"=> [ + [ + "parameterType"=> [ + "type"=> "STRING" ], - 'query' => $expectedQuery + "parameterValue"=> [ + "value"=> "A commit message." + ] ] - ] + ], + "parameterMode"=> "positional", + "labels"=> null, + "createSession"=> null, + "maximumBytesBilled"=> null, + "location"=> null, + "requestId"=> "myJobId", + "jobCreationMode"=> "JOB_CREATION_OPTIONAL", + "initialTimeoutMs"=> 10000, + "projectId"=> "my-awesome-project" ]) ->shouldBeCalledTimes(1) ->willReturn([ diff --git a/BigQuery/tests/Unit/BigQueryClientTest.php b/BigQuery/tests/Unit/BigQueryClientTest.php index cb36f6f58c50..6ffa79b16b67 100644 --- a/BigQuery/tests/Unit/BigQueryClientTest.php +++ b/BigQuery/tests/Unit/BigQueryClientTest.php @@ -157,6 +157,179 @@ public function testRunsQuery() $this->assertEquals(self::JOB_ID, $queryResults->identity()['jobId']); } + public function testRunQueryStateless() + { + $client = $this->getClient(); + $query = $client->query(self::QUERY_STRING); + + $this->connection->query(Argument::allOf( + Argument::withEntry('projectId', self::PROJECT_ID), + Argument::withEntry('query', self::QUERY_STRING), + Argument::withEntry('jobCreationMode', 'JOB_CREATION_OPTIONAL') + )) + ->willReturn([ + 'jobComplete' => true, + 'schema' => ['fields' => []], + 'rows' => [] + ]) + ->shouldBeCalledTimes(1); + + $client->___setProperty('connection', $this->connection->reveal()); + $queryResults = $client->runQuery($query); + + $this->assertInstanceOf(QueryResults::class, $queryResults); + $this->assertEquals('', $queryResults->identity()['jobId']); + $this->assertTrue($queryResults->isComplete()); + } + + public function testRunQueryJobQueryEndpointReturnsAJob() + { + $client = $this->getClient(); + $query = $client->query(self::QUERY_STRING); + + $this->connection->query(Argument::allOf( + Argument::withEntry('projectId', self::PROJECT_ID), + Argument::withEntry('query', self::QUERY_STRING), + Argument::withEntry('jobCreationMode', 'JOB_CREATION_OPTIONAL') + )) + ->willReturn([ + 'jobReference' => ['jobId' => self::JOB_ID] + ]) + ->shouldBeCalledTimes(1); + + $this->connection->insertJob(Argument::any()) + ->shouldNotBeCalled(); + + $this->connection->getQueryResults(Argument::allOf( + Argument::withEntry('projectId', self::PROJECT_ID), + Argument::withEntry('jobId', self::JOB_ID) + )) + ->willReturn([ + 'jobReference' => [ + 'jobId' => self::JOB_ID + ], + 'jobComplete' => true + ]) + ->shouldBeCalledTimes(1); + + $client->___setProperty('connection', $this->connection->reveal()); + $queryResults = $client->runQuery($query); + + $this->assertInstanceOf(QueryResults::class, $queryResults); + $this->assertEquals(self::JOB_ID, $queryResults->identity()['jobId']); + $this->assertTrue($queryResults->isComplete()); + } + + /** + * @dataProvider queryOptionsDataProvider + */ + public function testRunQueryNonStatelessUsesStatefulPath(string $optionName, mixed $value) + { + $client = $this->getClient(); + $query = $client->query(self::QUERY_STRING); + $query->$optionName($value); + + $this->connection->query(Argument::any()) + ->shouldNotbeCalled(); + + $this->connection->insertJob(Argument::any()) + ->willReturn([ + 'jobReference' => ['jobId' => self::JOB_ID] + ]) + ->shouldBeCalledTimes(1); + + $this->connection->getQueryResults(Argument::any()) + ->willReturn([ + 'jobReference' => [ + 'jobId' => self::JOB_ID + ], + 'jobComplete' => true + ]) + ->shouldBeCalledTimes(1); + + $client->___setProperty('connection', $this->connection->reveal()); + $queryResults = $client->runQuery($query); + + $this->assertInstanceOf(QueryResults::class, $queryResults); + $this->assertNotEmpty($queryResults->identity()['jobId']); + $this->assertTrue($queryResults->isComplete()); + } + + public function testRunQueryUsingJobUsesStatefulPath() + { + $client = $this->getClient(); + $query = $client->query(self::QUERY_STRING, [ + 'jobReference' => ['jobId' => self::JOB_ID] + ]); + + $this->connection->query(Argument::any()) + ->shouldNotbeCalled(); + + $this->connection->insertJob(Argument::any()) + ->willReturn([ + 'jobReference' => ['jobId' => self::JOB_ID] + ]) + ->shouldBeCalledTimes(1); + + $this->connection->getQueryResults(Argument::any()) + ->willReturn([ + 'jobReference' => [ + 'jobId' => self::JOB_ID + ], + 'jobComplete' => true + ]) + ->shouldBeCalledTimes(1); + + $client->___setProperty('connection', $this->connection->reveal()); + $queryResults = $client->runQuery($query); + + $this->assertInstanceOf(QueryResults::class, $queryResults); + $this->assertNotEmpty($queryResults->identity()['jobId']); + $this->assertTrue($queryResults->isComplete()); + } + + public function testRunQueryStatelessWhichReturnsJob() + { + $client = $this->getClient(); + $query = $client->query(self::QUERY_STRING); + + $this->connection->query(Argument::allOf( + Argument::withEntry('projectId', self::PROJECT_ID), + Argument::withEntry('query', self::QUERY_STRING), + Argument::withEntry('jobCreationMode', 'JOB_CREATION_OPTIONAL') + )) + ->willReturn([ + 'jobReference' => [ + 'jobId' => self::JOB_ID, + 'projectId' => self::PROJECT_ID, + 'location' => self::LOCATION + ], + 'jobComplete' => false, + 'schema' => ['fields' => []], + 'rows' => [] + ]) + ->shouldBeCalledTimes(1); + + $this->connection->getQueryResults(Argument::allOf( + Argument::withEntry('projectId', self::PROJECT_ID), + Argument::withEntry('jobId', self::JOB_ID) + )) + ->willReturn([ + 'jobReference' => [ + 'jobId' => self::JOB_ID + ], + 'jobComplete' => true + ]) + ->shouldBeCalledTimes(1); + + $client->___setProperty('connection', $this->connection->reveal()); + $queryResults = $client->runQuery($query); + + $this->assertInstanceOf(QueryResults::class, $queryResults); + $this->assertEquals(self::JOB_ID, $queryResults->identity()['jobId']); + $this->assertTrue($queryResults->isComplete()); + } + public function testRunsQueryWithRetry() { $client = $this->getClient(); @@ -516,6 +689,55 @@ public function testStartJob($expectedData, $expectedMethod, $returnedData) $this->assertEquals($this->jobResponse, $job->info()); } + public function queryOptionsDataProvider() + { + $destinationTable = $this->getClient() + ->dataset(self::DATASET_ID) + ->table('dummyTable'); + + return [ + [ + 'useLegacySql', true + ], + [ + 'destinationTable', $destinationTable + ], + [ + 'tableDefinitions', ['testDefinition'] + ], + [ + 'createDisposition', 'CREATE_IF_NEEDED' + ], + [ + 'writeDisposition', 'WRITE_TRUNCATE' + ], + [ + 'maximumBillingTier', 1 + ], + [ + 'timePartitioning', ['test'] + ], + [ + 'rangePartitioning', ['testRange'] + ], + [ + 'clustering', ['testClustering'] + ], + [ + 'destinationEncryptionConfiguration', ['ALLOW_FIELD_ADDITION'] + ], + [ + 'schemaUpdateOptions', ['ALLOW_FIELD_ADDITION'] + ], + [ + 'dryRun', true + ], + [ + 'priority', 'BATCH' + ] + ]; + } + public function jobConfigDataProvider() { $expected = [