Write SQL queries
Transform data from the CDF staging area into a data model using built-in and custom Spark SQL queries. Select Switch to SQL editor on the Transform data page to create a transformation in Spark SQL. This article describes the queries and explains how you can load data incrementally.
The SQL editor offers built-in code completion and built-in Spark SQL functions and Cognite custom SQL functions.
Your changes won't be kept if you switch from the SQL editor to the mapping editor.
Read from a CDF staging table
To select data from a CDF staging table, use the syntax mydb.mytable
:
select
*
from
database-name.table-name
If your database or table name contains special characters, enclose the name in backticks, for example `my-db`.`my table`
.
Avoid schema inference
Transformations infer schemas in the CDF staging table, but this process only uses a subset of all the rows in the table. You can avoid schema inference and write a schema fitted your data.
To avoid schema inference:
select
*
from
cdf_raw("database-name", "table-name")
This returns data with the schema key:STRING
, lastUpdatedTime:TIMESTAMP
, columns:STRING
, where the columns
string contains the JSON value encoded as a string.
Here's an example of how to enforce a user-defined schema:
select
get_json_object(columns, '$.externalId') AS externalId,
timestamp(get_json_object(columns, '$.timestamp')) AS timestamp,
double(get_json_object(columns, '$.value')) AS value
from
cdf_raw("database-name", "table-name")
Read from other CDF resource types
To select other CDF resource types, use the syntax _cdf.resource_type
.
select * from _cdf.events
The supported resource types are:
_cdf.events
_cdf.assets
_cdf.files
_cdf.timeseries
_cdf.sequences
_cdf_sequences.<sequence_externalId>
_cdf.datapoints
_cdf.stringdatapoints
_cdf.labels
_cdf.relationships
Load data incrementally
When reading from staging tables, you probably want to transform only the data that has changed since the last transformation job ran. To achieve this, you can filter on the lastUpdatedTime
column to query for the rows that have changed after a specific timestamp.
When filtering on lastUpdatedTime
, the filter is pushed down to the RAW service itself, so this query can be performed efficiently.
For example: select * from mydb.mytable where lastUpdatedTime > to_timestamp(123456)
.
Instead of encoding the timestamp directly in the query and manually keeping it up to date every time new data has been processed, you can use the is_new
function. This function returns true
when a row has changed since the last time the transformation was run and false
otherwise.
The first time you run a transformation using the query below, all the rows of mytable
will be processed:
select * from mydb.mytable where is_new("mydb_mytable", lastUpdatedTime)
If the transformation completes successfully, the second run will only process rows that have changed since the first run.
If the transformation fails, is_new
filters the same rows the next time the transformation is run. This ensures that there is no data loss in the transformation from source to destination.
Incremental load is disabled when previewing query results. That is, is_new
will always return true
for all rows.
Each is_new
filter is identified by a name (for example,"mydb_mytable"
) and can be set to any constant string. This allows you to differentiate between multiple calls to is_new
in the same query and use is_new
to filter on multiple tables. To easily identify the different filters, we recommend that you use the name of the table as the name of the is_new
filter.
Backfill
To process all the data even if it hasn't changed since the last transformation, change the name of the is_new
filter, for example, by adding a postfix with an incrementing number (e.g. "mydb_mytable_1"
).
This is especially useful when the logic of the query changes and data that has already been imported needs to be updated accordingly.
Custom SQL functions
In addition to the built-in Spark SQL functions, we also provide a set of custom SQL functions to help you write efficient transformations.
When a function expects var_args
, it allows a variable number of arguments of any type, including star *
.
get_names
- get_names(var_args): Array[String]
Returns an array of the field names of a struct or row.
Example
select get_names(*) from mydb.mytable
-- Returns the column names of 'mydb.mytable'
select get_names(some_struct.*) from mydb.mytable
-- Returns the field names of 'some_struct'
cast_to_strings
- cast_to_strings(var_args): Array[String]
Casts the arguments to an array of strings. It handles array, struct and map types by casting it to JSON strings.
Example
select cast_to_strings(*) from mydb.mytable
-- Returns the values of all columns in 'mydb.mytable' as strings
to_metadata
- to_metadata(var_args): Map[String, String]
Creates metadata compatible type from the arguments. In practice it does map_from_arrays(get_names(var_args), cast_to_strings(var_args))
. Use this function when you want to transform your columns or structures into a format that fits the metadata field in CDF.
Example
select to_metadata(*) from mydb.mytable
-- Creates a metadata structure from all the columns found in 'mydb.mytable'
to_metadata_except
- to_metadata_except(excludeFilter: Array[String], var_args)
Returns a metadata structure (Map[String, String]
) where strings found in excludeFilter
will exclude keys from var_args
.
Use this function when you want to put most, but not all, columns into metadata, for example to_metadata_except(array("someColumnToExclude"), *)
Example
select to_metadata_except(array("myCol"), myCol, testCol) from mydb.mytable
-- Creates a map where myCol is filtered out.
-- The result in this case will be Map("testCol" -> testCol.value.toString)
asset_ids
Attempts to find asset names under the given criteria and return the IDs of the matching assets. Three variations are available.
Attempts to find given assetNames
in all assets.
- asset_ids(assetNames: Array[String]): Array[BigInt]
Attempts to find assetNames
in the asset hierarchy with rootAssetName
as their root asset.
- asset_ids(assetNames: Array[String], rootAssetName: String): Array[BigInt]
Attempts to find assetNames
that belong to the datasetIds
.
- asset_ids(assetNames: Array[String], datasetIds: Array[Long]): Array[BigInt]
Attempts to find assetNames
that belong to the datasetIds
under the rootAssetName
.
- asset_ids(assetNames: Array[String], rootAssetName: String, datasetIds: Array[Long]): Array[BigInt]
See Assets for more information about assets in CDF.
The entire job will be aborted if asset_ids()
did not find any matching assets.
Example
select asset_ids(array("PV10", "PV11"))
select asset_ids(array("PV10", "PV11"), "MyBoat")
select asset_ids(array("PV10", "PV11"), array(254343, 23433, 54343))
select asset_ids(array("PV10", "PV11"), array(dataset_id("pv-254343-ext-id"), 23433, 54343))
select asset_ids(array("PV10", "PV11"), "MyBoat", array(dataset_id("pv-254343-ext-id"), 23433, 54343))