以下代码可以只关注StreamLoad具体实现。
1.创建StreamLoad返回值Model
public class StreamLoadResponse
{
public long TxnId { get; set; }
public string Label { get; set; }
public string Comment { get; set; }
public string TwoPhaseCommit { get; set; }
public string Status { get; set; }
public string Message { get; set; }
public long NumberTotalRows { get; set; }
public long NumberLoadedRows { get; set; }
public long NumberFilteredRows { get; set; }
public long NumberUnselectedRows { get; set; }
public long LoadBytes { get; set; }
public long LoadTimeMs { get; set; }
public long BeginTxnTimeMs { get; set; }
public long StreamLoadPutTimeMs { get; set; }
public long ReadDataTimeMs { get; set; }
public long WriteDataTimeMs { get; set; }
public long CommitAndPublishTimeMs { get; set; }
}
2.创建Doris StreamLoad接口
public interface IDorisApiService
{
/// <summary>
///
/// </summary>
/// <param name="database">数据库</param>
/// <param name="table">表</param>
/// <param name="authorization">认证信息,格式 username:pwd</param>
/// <param name="content">csv格式的字符串</param>
/// <returns></returns>
StreamLoadResponse StreamLoad(string database, string table, string authorization, string content);
}
3.实现接口,核心代码,逻辑并不复杂,组装一个http请求所需的内容。
需要注意的是:(1)示例csv格式的字符串分割符为‘\t’,而不是常用的逗号,这也是官方默认的分割方式,如果你想用其他的分隔符,需要在header里配置column_separator。建议不要用逗号,因为涉及到复杂的json字符串的时候,里面的逗号会导致解析异常,即便官方文档里有相关的处理方式(enclose),似乎仍然存在问题。(2)我们请求了两次,第一次请求会重定向到BE节点的地址,然后用此地址再次请求。这是正常的。(3)我们采用的format是csv_with_names,第一行是列名,请确保跟数据库table列顺序和数量保持一致(4)注意认证信息格式为username:password,数据库用户
public class DorisApiService : IDorisApiService
{
private readonly HttpClient _httpClient;
public DorisApiService(HttpClient httpClient)
{
_httpClient = httpClient;
}
public StreamLoadResponse StreamLoad(string database, string table, string authorization, string content)
{
var url = $"/api/{database}/{table}/_stream_load";
var request = new HttpRequestMessage(HttpMethod.Put, url);
request.Headers.Add("Expect", "100-continue");
request.Headers.Add("format", "csv_with_names");
request.Headers.Add("column_separator", "\t");
request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authorization);
var response = _httpClient.Send(request, HttpCompletionOption.ResponseHeadersRead);
if (response.StatusCode == HttpStatusCode.TemporaryRedirect || response.StatusCode == HttpStatusCode.RedirectKeepVerb)
{
var redirectUrl = response.Headers.Location.ToString();
request = new HttpRequestMessage(HttpMethod.Put, redirectUrl)
{
Content = new StringContent(content, Encoding.UTF8, "text/plain")
};
request.Headers.Add("Expect", "100-continue");
request.Headers.Add("format", "csv_with_names");
request.Headers.Add("column_separator", "\t");
request.Headers.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", authorization);
response = _httpClient.Send(request, HttpCompletionOption.ResponseHeadersRead);
}
string responseBody = response.Content.ReadAsStringAsync().Result;
if (response.IsSuccessStatusCode)
{
var streamLoadResponse = JsonSerializer.Deserialize<StreamLoadResponse>(responseBody);
if (streamLoadResponse.Status == "Success")
{
return streamLoadResponse;
}
else
{
throw new Exception(responseBody);
}
}
else
{
throw new Exception(responseBody);
}
}
}
4.Program配置
services.AddHttpClient<IDorisApiService, DorisApiService>(client =>
{
//从配置文件获取Doris的请求地址和端口:settings.ApiHost
client.BaseAddress = new Uri(settings.ApiHost);
client.Timeout = TimeSpan.FromSeconds(300);
}).ConfigurePrimaryHttpMessageHandler(() => new HttpClientHandler
{
AllowAutoRedirect = false
});