Script Collectors

Overview

DataBlend supports users wishing to create scripts within the DataBlend Script Collector. The DataBlend Script Collector is a unique connector in that the connection is established within the DataBlend application and collects data from other DataBlend Collectors using arbitrary JavaScript with tie-ins to .NET with the help of jint. Any asynchronous methods should use .Result to wait for the method to complete. The DataBlend Script Collector can be used to collect from external sources. Establishing the connection is simpler than other DataBlend connections as this Collector does not require a Credential to be created first.

Configuration

 
 

Field

 

Required/ Optional

 

Comments

 

Type

Required

Script

Name

Required

Free Text

Data Source

Required

Select the desired Data Source from drop-down.

Schema

Required

Select the desired schema from the drop-down menu.

Script

Required

Enter any script to collect data from a source

 

Tasks

The scripts collector can now run the scripts task within the scripts collector.

Limits

Scripts are only allowed to run for one hour before being terminated. A script may also not allocate over 2GB of memory total during its execution.

Jobs

Parameters

Job parameters get interpolated before the script runs so using will inject the value directly into the script. For example, if a job has a parameter called “name” with a value of “test”

log('Script Collectors');

will write “test” to a log entry.

Context

All job types listed below have the same three objects injected automatically into the JavaScript context

job - The current job execution object. The object is the same as what would be returned by the API for that job. For example job.parent.name would return the name of the Collector when running a collection job.

log(string)- A function used to output messages to the job logs.

checkCancellation() - Asynchronous function that can be called at any time to see if the user has cancelled the job. This should be used between long-running tasks (eg loading data from multiple API calls) otherwise the job won’t be cancelled until the very end.

Additional context values unique to each job, if any, type are outlined below.

Collectors

schemaName - The schema name set in the parent collector.

createSchema(object) - Shortcut function used to create a schema from a JSON object.

startUpload(schema, collection, totalRecords)- Function used to signal the start of ingestion of data. The schema can be created using the createSchema function and the collection parameter is the job mentioned above. totalRecords should only be used if the total amount of records is known before all the data is uploaded, otherwise use null. This function returns a CollectionItem used in the next two functions.

sendData(collectionItem, records) - Function to upload partial or all data. The collectionItem parameter is the value returned from startUpload. records is an array of JSON objects to store. This method returns a CollectionItem that should be used for subsequent calls to sendData and finishUpload.

finishUpload(collectionItem)- Function used to signal the end of the stream of data. collectionItem is the returned value from sendData or startUpload if there was no data sent.

Data Targets

records - Array of objects from the query execution converted into JSON objects.

Tasks

No additional values.

Additional .NET objects available

In addition to any standard JavaScript objects a few other libraries are available to augment JavaScript’s native capabilities.

HttpClient

Any objects found within the System.Net.Http assembly can be used.

const http = importNamespace('System.Net.Http'); const client = new http.HttpClient(); const message = new http.HttpRequestMessage(http.HttpMethod.Get, 'some url'); const response = this.client.SendAsync(message).Result; const content = JSON.parse(response.Content.ReadAsStringAsync().Result);

SftpClient

SFTP support is provided by Renci.SshNet

const ssh = importNamespace('Renci.SshNet'); const client = new ssh.SftpClient(new ssh.ConnectionInfo('host', 22, 'username', [new ssh.PasswordAuthenticationMethod('username', 'password')])); client.Connect();

JSON

JSON support is provided by Newtonsoft.Json

const json = importNamespace('Newtonsoft.Json'); json.JsonConvert.SerializeObject(someObject);

Crypto

Cryptographic support is provided by System.Security.Cryptography

const sys = importNamespace('System'); const crypto = importNamespace('System.Security.Cryptography'); const text = importNamespace('System.Text'); const hmac = new crypto.HMACSHA256(text.Encoding.UTF8.GetBytes('key')); const hash = hmac.ComputeHash(text.Encoding.UTF8.GetBytes('test')); const hex = sys.BitConverter.ToString(hash); log(hex.replaceAll('-', '').toLowerCase());

XML

XML support is provided by System.Xml

const xml = importNamespace('System.Xml'); const text = '<root attr="test"><test>value1</test><test>value2</test></root>'; const doc = new xml.XmlDocument(); doc.LoadXml(text);

CSV

CSV support is provided by CsvHelper

const sys = importNamespace('System'); const global = importNamespace('System.Globalization'); const io = importNamespace('System.IO'); const csv = importNamespace('CsvHelper'); const stream = new io.MemoryStream(); // load data into stream stream.Seek(0, io.SeekOrigin.Begin); const streamReader = new io.StreamReader(stream); const csvReader = new csv.CsvReader(streamReader, global.CultureInfo.InvariantCulture, false); const records = csvReader.GetRecords(new sys.Object().GetType()).ToList();

Microsoft Graph

Microsoft Graph support is provided by Microsoft.Graph

DataBlend API Clients

Most DataBlend objects have built-in API clients injected into the job context. For example, listing the first ten collectors available to the user running the job can be accomplished via the Collectors client.

const json = importNamespace('Newtonsoft.Json'); const collectors = Collectors .Search({limit: 10}) .Result .Results .Select(c => c.Name); log(json.JsonConvert.SerializeObject(collectors));

List of available clients

AgentPings
Agents
CollectionItems
CollectionLogs
Collections
Collectors
Credentials
CredentialTests
DataQualityReportExecutionLogs
DataQualityReportExecutions
DataQualityReports
DatasinkExecutionLogs
DatasinkExecutions
Datasinks
Datasources
DatasourceSchemas
PluginSchemas
Queries
QueryExecutionLogs
QueryExecutions
SimpleTaskExecutionLogs
SimpleTaskExecutions
SimpleTasks
Streams
StreamUploadLogs
StreamUploads
UnpivotExecutionLogs
UnpivotExecutions
Unpivots
UserProfiles
WorkflowExecutionLogs
WorkflowExecutions
Workflows
WorkflowTasks

Example

Get Most Recent Query Collector

const getMostRecentQueryExecutionByQueryId = (id) => { const search = importNamespace('Datablend.Search'); const models = importNamespace('Datablend.Api.Models'); const QueryExecutionSearch = search.Search(models.QueryExecution); const OrderList = System.Collections.Generic.List(search.Order); let executionSearch = new QueryExecutionSearch(); executionSearch.Limit = 1; executionSearch.Orders = new OrderList(); executionSearch.Orders.Add(search.Order.Descending('created')); executionSearch.Predicate = search.Predicates.Junction.And(search.Predicates.Property.EqualTo('parent.id', id), search.Predicates.Property.EqualTo('state', 'Complete')); executionSearch = QueryExecutions.Search(executionSearch).Result; return executionSearch.Results[0]; }; const getRecordsFromQueryExecution = (id) => { const sys = importNamespace('System'); const global = importNamespace('System.Globalization'); const io = importNamespace('System.IO'); const csv = importNamespace('CsvHelper'); const stream = QueryExecutions.Results(new sys.Guid(id)).Result; const streamReader = new io.StreamReader(stream); const csvReader = new csv.CsvReader(streamReader, global.CultureInfo.InvariantCulture, false); return csvReader.GetRecords(new sys.Object().GetType()).ToList(); } const queryId = '123456789-c111-1111-1111-111122334455'; const queryExecution = getMostRecentQueryExecutionByQueryId(queryId); const records = getRecordsFromQueryExecution(queryExecution.id); log(records[0].first_name);

Collecting JSON Data

const http = importNamespace('System.Net.Http'); const client = new http.HttpClient(); const response = client.GetAsync('https://dummyjson.com/products').Result; const records = JSON.parse(response.Content.ReadAsStringAsync().Result).products var schema = createSchema(schemaName, records[0]); let collectionItem = startUpload(schema, job, records.length).Result; collectionItem = sendData(collectionItem, records).Result; finishUpload(collectionItem).Result;