创建 JSON 映射。
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
此命令创建一个映射,以将 JSON 根路径 $
映射到 Event
列。
将数据引入 RawEvents
表中。
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
创建 RawEvents
表。
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var kustoConnectionStringBuilder =
new KustoConnectionStringBuilder(ingestUri)
FederatedSecurity = true,
InitialCatalog = database,
UserID = user,
Password = password,
Authority = tenantId
var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder);
var table = "RawEvents";
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
Tuple.Create("Events", "System.Object"),
kustoClient.ExecuteControlCommand(command);
创建 JSON 映射。
var tableMapping = "RawEventMapping";
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Json,
tableName,
tableMapping,
new[] {
new ColumnMapping {ColumnName = "Events", Properties = new Dictionary<string, string>() {
{"path","$"} }
} });
kustoClient.ExecuteControlCommand(command);
此命令创建一个映射,以将 JSON 根路径 $
映射到 Event
列。
将数据引入 RawEvents
表中。
var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var ingestConnectionStringBuilder =
new KustoConnectionStringBuilder(ingestUri)
FederatedSecurity = true,
InitialCatalog = database,
UserID = user,
Password = password,
Authority = tenantId
var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties =
new KustoQueuedIngestionProperties(database, table)
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping()
IngestionMappingReference = tableMapping
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
创建 RawEvents
表。
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
创建 JSON 映射。
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
将数据引入 RawEvents
表中。
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
数据是根据批处理策略聚合的,因此会出现几分钟的延迟。
创建一个新表,该表采用类似于 JSON 输入数据的架构。 我们将对下面的所有示例和引入命令使用此表。
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
创建 JSON 映射。
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
在此映射中,根据表架构的定义,timestamp
条目将作为 datetime
数据类型引入到 Time
列。
将数据引入 Events
表中。
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
文件“simple.json”包含几条行分隔的 JSON 记录。 格式为 json
,在引入命令中使用的映射是创建的 FlatEventMapping
。
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double"),
kustoClient.ExecuteControlCommand(command);
创建 JSON 映射。
var tableMapping = "FlatEventMapping";
var table = "Events";
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Json,
table,
tableMapping,
new[]
new ColumnMapping() {ColumnName = "Time", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.timestamp"} } },
new ColumnMapping() {ColumnName = "Device", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.deviceId" } } },
new ColumnMapping() {ColumnName = "MessageId", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.messageId" } } },
new ColumnMapping() {ColumnName = "Temperature", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.temperature" } } },
new ColumnMapping() { ColumnName= "Humidity", Properties = new Dictionary<string, string>() {{ MappingConsts.Path, "$.humidity" } } },
kustoClient.ExecuteControlCommand(command);
在此映射中,根据表架构的定义,timestamp
条目将作为 datetime
数据类型引入到 Time
列。
将数据引入 Events
表中。
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties =
new KustoQueuedIngestionProperties(database, table)
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping()
IngestionMappingReference = tableMapping
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
文件“simple.json”包含几条行分隔的 JSON 记录。 格式为 json
,在引入命令中使用的映射是创建的 FlatEventMapping
。
创建一个新表,该表采用类似于 JSON 输入数据的架构。 我们将对下面的所有示例和引入命令使用此表。
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
创建 JSON 映射。
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
将数据引入 Events
表中。
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
文件“simple.json”包含几条行分隔的 JSON 记录。 格式为 json
,在引入命令中使用的映射是创建的 FlatEventMapping
。
将数据引入 Events
表中。
var tableMapping = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties =
new KustoQueuedIngestionProperties(database, table)
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping()
IngestionMappingReference = tableMapping
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
将数据引入 Events
表中。
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
引入包含数组的 JSON 记录
数组数据类型是按顺序排列的值集合。 JSON 数组的引入由更新策略来完成。 JSON 将按原样引入到中间表。 更新策略针对 RawEvents
表运行某个预定义的函数,并将结果重新引入到目标表。 我们将引入采用以下结构的数据:
"records":
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
使用 mv-expand
运算符创建一个 update policy
函数用于扩展 records
的集合,使集合中的每个值收到一个单独的行。 我们将使用表 RawEvents
作为源表,使用 Events
作为目标表。
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
该函数收到的架构必须与目标表的架构相匹配。 使用 getschema
运算符检查架构。
EventRecordsExpand() | getschema
将更新策略添加到目标表。 此策略将自动对 RawEvents
中间表中的任何新引入数据运行查询,并将结果引入到 Events
表中。 定义零保留期策略,以避免持久保存中间表。
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
将数据引入 RawEvents
表中。
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
检查 Events
表中的数据。
Events
使用 mv-expand
运算符创建一个 update 函数用于扩展 records
的集合,使集合中的每个值收到一个单独的行。 我们将使用表 RawEvents
作为源表,使用 Events
作为目标表。
var command =
CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false);
kustoClient.ExecuteControlCommand(command);
该函数收到的架构必须与目标表的架构相匹配。
将更新策略添加到目标表。 此策略将针对 RawEvents
中间表中任何新引入数据自动运行查询,并将查询结果引入到 Events
表中。 定义零保留期策略,以避免持久保存中间表。
var command =
".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
kustoClient.ExecuteControlCommand(command);
将数据引入 RawEvents
表中。
var table = "RawEvents";
var tableMapping = "RawEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var properties =
new KustoQueuedIngestionProperties(database, table)
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping()
IngestionMappingReference = tableMapping
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
检查 Events
表中的数据。
使用 mv-expand
运算符创建一个 update 函数用于扩展 records
的集合,使集合中的每个值收到一个单独的行。 我们将使用表 RawEvents
作为源表,使用 Events
作为目标表。
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
该函数收到的架构必须与目标表的架构相匹配。
将更新策略添加到目标表。 此策略将针对 RawEvents
中间表中任何新引入数据自动运行查询,并将查询结果引入到 Events
表中。 定义零保留期策略,以避免持久保存中间表。
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
将数据引入 RawEvents
表中。
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
检查 Events
表中的数据。