Skip to content
51 changes: 45 additions & 6 deletions BigQuery/src/BigQueryClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,44 @@ public function runQuery(JobConfigurationInterface $query, array $options = [])
], $options);
$queryResultsOptions['initialTimeoutMs'] = 10000;

$queryResults = $this->startQuery(
$query,
$options
)->queryResults($queryResultsOptions + $options);
if ($query instanceof QueryJobConfiguration && $query->isStateless()) {
$queryRequest = $query->toQueryRequest();

// The flattened notation does not work for POST request without special handling.
// Propagate it for backwards compatibility.
if (isset($queryResultsOptions['formatOptions.useInt64Timestamp'])) {
$useInt64 = $this->pluck('formatOptions.useInt64Timestamp', $queryResultsOptions, false);
$queryResultsOptions['formatOptions']['useInt64Timestamp'] = $useInt64;
}

$statelessArgs = $queryRequest + $queryResultsOptions + [
'projectId' => $this->projectId
] + $options;

if (!isset($statelessArgs['timeoutMs'])) {
$statelessArgs['timeoutMs'] = $statelessArgs['initialTimeoutMs'];
}

$response = $this->connection->query($statelessArgs);

if ($response['jobComplete'] ?? false) {
return new QueryResults(
$this->connection,
'',
$this->projectId,
$response,
$this->mapper,
$this->createJob([], ''), // create an empty job
$queryResultsOptions
);
}

$job = $this->createJob($response, $response['jobReference']['jobId']);
} else {
$job = $this->startQuery($query, $options);
}

$queryResults = $job->queryResults($queryResultsOptions + $options);
$queryResults->waitUntilComplete();
return $queryResults;
}
Expand Down Expand Up @@ -772,12 +806,17 @@ public function startJob(JobConfigurationInterface $config, array $options = [])
$response = $this->connection->insertJob($config);
}

return $this->createJob($response, $config['jobReference']['jobId']);
}

private function createJob(array $info, string $jobId)
{
return new Job(
$this->connection,
$config['jobReference']['jobId'],
$jobId,
$this->projectId,
$this->mapper,
$response
$info
);
}

Expand Down
95 changes: 95 additions & 0 deletions BigQuery/src/QueryJobConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ class QueryJobConfiguration implements JobConfigurationInterface
{
use JobConfigurationTrait;

private const JOB_CREATION_MODE_OPTIONAL = 'JOB_CREATION_OPTIONAL';

/**
* @var ValueMapper Maps values between PHP and BigQuery.
*/
private $mapper;
private bool $isJobIdGenerated = false;

/**
* @param ValueMapper $mapper Maps values between PHP and BigQuery.
Expand All @@ -58,6 +61,13 @@ public function __construct(
$this->mapper = $mapper;
$this->jobConfigurationProperties($projectId, $config, $location);

if (!isset($config['jobReference']['jobId'])) {
// If the user did not submit a jobId to the configuration array, the library will create a JobId before
// the request is sent. This is used to keep track if it is the user who set the JobId or the library
/// for stateless queries.
$this->isJobIdGenerated = true;
}

if (!isset($this->config['configuration']['query']['useLegacySql'])) {
$this->config['configuration']['query']['useLegacySql'] = false;
}
Expand Down Expand Up @@ -593,4 +603,89 @@ public function writeDisposition($writeDisposition)

return $this;
}

/**
* Returns true if the current configuration is compatible with the stateless query API.
* False if not
*
* @internal
* @return bool
*/
public function isStateless(): bool
{
$config = $this->config;
$queryConfig = $config['configuration']['query'];

if (isset($queryConfig['destinationTable']) ||
isset($queryConfig['tableDefinitions']) ||
isset($queryConfig['createDisposition']) ||
isset($queryConfig['writeDisposition']) ||
($queryConfig['useLegacySql'] ?? false) ||
isset($queryConfig['maximumBillingTier']) ||
isset($queryConfig['timePartitioning']) ||
isset($queryConfig['rangePartitioning']) ||
isset($queryConfig['clustering']) ||
isset($queryConfig['destinationEncryptionConfiguration']) ||
isset($queryConfig['schemaUpdateOptions']) ||
isset($queryConfig['jobTimeoutMs'])
) {
return false;
}

if (isset($queryConfig['priority']) && $queryConfig['priority'] !== 'INTERACTIVE') {
return false;
}

if ($config['configuration']['dryRun'] ?? false) {
return false;
}

// Creating a jobConfiguration from the library sets the JobId always meaning we do not have a way
// to determine if this jobId was set by the user or our library.
// We check if this was autogenerated to circumvent this issue.
if (isset($config['jobReference']['jobId']) && !$this->isJobIdGenerated()) {
return false;
}

return true;
}

/**
* Returns an array representation of a QueryRequest:
* [QueryRequest](https://docs.cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest)
*
* @return array
*/
public function toQueryRequest(): array
{
$config = $this->config;
$queryConfig = $config['configuration']['query'];

return [
'query' => $queryConfig['query'],
'maxResults' => $queryConfig['maxResults'] ?? null,
'defaultDataset' => $queryConfig['defaultDataset'] ?? null,
'timeoutMs' => $queryConfig['timeoutMs'] ?? null,
'useQueryCache' => $queryConfig['useQueryCache'] ?? null,
'useLegacySql' => false,
'queryParameters' => $queryConfig['queryParameters'] ?? null,
'parameterMode' => $queryConfig['parameterMode'] ?? null,
'labels' => $config['configuration']['labels'] ?? null,
'createSession' => $queryConfig['createSession'] ?? null,
'maximumBytesBilled' => $queryConfig['maximumBytesBilled'] ?? null,
'location' => $config['jobReference']['location'] ?? null,
'requestId' => $config['jobReference']['jobId'],
'jobCreationMode' => self::JOB_CREATION_MODE_OPTIONAL
];
}

/**
* Returns if the JobId was generated by the JobConfigurationTrait
*
* @return bool
*/
private function isJobIdGenerated(): bool
{
return $this->isJobIdGenerated;
}
}
6 changes: 6 additions & 0 deletions BigQuery/src/QueryResults.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class QueryResults implements \IteratorAggregate
* @var array Default options to be used for calls to get query results.
*/
private $queryResultsOptions;
private bool $isStateless = false;

/**
* @param ConnectionInterface $connection Represents a connection to
Expand Down Expand Up @@ -102,6 +103,7 @@ public function __construct(
? $info['jobReference']['location']
: $job->identity()['location']
];
$this->isStateless = empty($jobId);
$this->mapper = $mapper;
$this->queryResultsOptions = $queryResultsOptions;
}
Expand Down Expand Up @@ -292,6 +294,10 @@ public function info()
*/
public function reload(array $options = [])
{
if ($this->isStateless) {
return $this->info;
}

return $this->info = $this->connection->getQueryResults(
$options + $this->identity
);
Expand Down
106 changes: 59 additions & 47 deletions BigQuery/tests/Snippet/BigQueryClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public function testRunQuery()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery');
$snippet->addLocal('bigQuery', $this->client);
$this->connection->insertJob(Argument::any())
$this->connection->query(Argument::any())
->shouldBeCalled()
->willReturn([
'jobComplete' => false,
Expand All @@ -179,36 +179,42 @@ public function testRunQueryWithNamedParameters()
$expectedQuery = 'SELECT commit FROM `bigquery-public-data.github_repos.commits`' .
'WHERE author.date < @date AND message = @message LIMIT 100';
$this->connection
->insertJob([
'projectId' => self::PROJECT_ID,
'jobReference' => ['projectId' => self::PROJECT_ID, 'jobId' => self::JOB_ID],
'configuration' => [
'query' => [
'parameterMode' => 'named',
'useLegacySql' => false,
'queryParameters' => [
[
'name' => 'date',
'parameterType' => [
'type' => 'TIMESTAMP'
],
'parameterValue' => [
'value' => '1980-01-01 12:15:00.000000+00:00'
]
],
[
'name' => 'message',
'parameterType' => [
'type' => 'STRING'
],
'parameterValue' => [
'value' => 'A commit message.'
]
]
->query([
"query"=> $expectedQuery,
"maxResults"=> null,
"defaultDataset"=> null,
"timeoutMs"=> 10000,
"useQueryCache"=> null,
"useLegacySql"=> false,
"queryParameters"=> [
[
"parameterType"=> [
"type"=> "TIMESTAMP"
],
"parameterValue"=> [
"value"=> "1980-01-01 12:15:00.000000+00:00"
],
"name"=> "date"
],
[
"parameterType"=> [
"type"=> "STRING"
],
"parameterValue"=> [
"value"=> "A commit message."
],
'query' => $expectedQuery
"name"=> "message"
]
]
],
"parameterMode"=> "named",
"labels"=> null,
"createSession"=> null,
"maximumBytesBilled"=> null,
"location"=> null,
"requestId"=> "myJobId",
"jobCreationMode"=> "JOB_CREATION_OPTIONAL",
"initialTimeoutMs"=> 10000,
"projectId"=> "my-awesome-project"
])
->shouldBeCalledTimes(1)
->willReturn([
Expand All @@ -233,26 +239,32 @@ public function testRunQueryWithPositionalParameters()
$snippet->addLocal('bigQuery', $this->client);
$expectedQuery = 'SELECT commit FROM `bigquery-public-data.github_repos.commits` WHERE message = ? LIMIT 100';
$this->connection
->insertJob([
'projectId' => self::PROJECT_ID,
'jobReference' => ['projectId' => self::PROJECT_ID, 'jobId' => self::JOB_ID],
'configuration' => [
'query' => [
'parameterMode' => 'positional',
'useLegacySql' => false,
'queryParameters' => [
[
'parameterType' => [
'type' => 'STRING'
],
'parameterValue' => [
'value' => 'A commit message.'
]
]
->query([
"query"=> "SELECT commit FROM `bigquery-public-data.github_repos.commits` WHERE message = ? LIMIT 100",
"maxResults"=> null,
"defaultDataset"=> null,
"timeoutMs"=> 10000,
"useQueryCache"=> null,
"useLegacySql"=> false,
"queryParameters"=> [
[
"parameterType"=> [
"type"=> "STRING"
],
'query' => $expectedQuery
"parameterValue"=> [
"value"=> "A commit message."
]
]
]
],
"parameterMode"=> "positional",
"labels"=> null,
"createSession"=> null,
"maximumBytesBilled"=> null,
"location"=> null,
"requestId"=> "myJobId",
"jobCreationMode"=> "JOB_CREATION_OPTIONAL",
"initialTimeoutMs"=> 10000,
"projectId"=> "my-awesome-project"
])
->shouldBeCalledTimes(1)
->willReturn([
Expand Down
Loading
Loading