209 lines
7.1 KiB
Java
209 lines
7.1 KiB
Java
|
|
package com.mystarvindata.vindata.utils;
|
||
|
|
|
||
|
|
import com.mystarvindata.vindata.config.CloudConfig;
|
||
|
|
import com.tencentcloudapi.common.Credential;
|
||
|
|
import com.tencentcloudapi.common.profile.ClientProfile;
|
||
|
|
import com.tencentcloudapi.common.profile.HttpProfile;
|
||
|
|
import com.tencentcloudapi.dlc.v20210125.DlcClient;
|
||
|
|
import com.tencentcloudapi.dlc.v20210125.models.*;
|
||
|
|
import lombok.RequiredArgsConstructor;
|
||
|
|
import lombok.extern.slf4j.Slf4j;
|
||
|
|
import org.springframework.stereotype.Component;
|
||
|
|
|
||
|
|
import java.nio.charset.StandardCharsets;
|
||
|
|
import java.util.*;
|
||
|
|
|
||
|
|
@Slf4j
|
||
|
|
@Component
|
||
|
|
@RequiredArgsConstructor
|
||
|
|
public class WeDataUtils {
|
||
|
|
|
||
|
|
private final CloudConfig.WeDataConfig weDataConfig;
|
||
|
|
|
||
|
|
private static final long POLL_INTERVAL_MS = 1000;
|
||
|
|
private static final long MAX_WAIT_MS = 300_000;
|
||
|
|
|
||
|
|
public List<Map<String, Object>> queryWeData(String sql, List<Object> params) {
|
||
|
|
if (params != null && !params.isEmpty()) {
|
||
|
|
sql = applyParams(sql, params);
|
||
|
|
}
|
||
|
|
|
||
|
|
try {
|
||
|
|
DlcClient dlcClient = buildDlcClient();
|
||
|
|
|
||
|
|
String taskId = submitTask(dlcClient, sql);
|
||
|
|
log.info("WeData DLC task submitted, taskId={}", taskId);
|
||
|
|
|
||
|
|
TaskResultInfo resultInfo = pollTaskResult(dlcClient, taskId);
|
||
|
|
|
||
|
|
return parseResultSet(resultInfo);
|
||
|
|
} catch (Exception e) {
|
||
|
|
log.error("WeData DLC query failed, sql={}", sql, e);
|
||
|
|
return new ArrayList<>();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public List<Map<String, Object>> queryWeData(String sql) {
|
||
|
|
return queryWeData(sql, null);
|
||
|
|
}
|
||
|
|
|
||
|
|
private DlcClient buildDlcClient() {
|
||
|
|
Credential credential = new Credential(
|
||
|
|
weDataConfig.getAccessKey(),
|
||
|
|
weDataConfig.getSecretKey()
|
||
|
|
);
|
||
|
|
|
||
|
|
HttpProfile httpProfile = new HttpProfile();
|
||
|
|
httpProfile.setEndpoint(
|
||
|
|
weDataConfig.getEndpoint() != null
|
||
|
|
? weDataConfig.getEndpoint()
|
||
|
|
: "dlc.tencentcloudapi.com"
|
||
|
|
);
|
||
|
|
|
||
|
|
ClientProfile clientProfile = new ClientProfile();
|
||
|
|
clientProfile.setHttpProfile(httpProfile);
|
||
|
|
|
||
|
|
String region = weDataConfig.getRegion() != null
|
||
|
|
? weDataConfig.getRegion()
|
||
|
|
: "ap-shanghai";
|
||
|
|
|
||
|
|
return new DlcClient(credential, region, clientProfile);
|
||
|
|
}
|
||
|
|
|
||
|
|
private String submitTask(DlcClient dlcClient, String sql) throws Exception {
|
||
|
|
String encodedSql = Base64.getEncoder().encodeToString(
|
||
|
|
sql.getBytes(StandardCharsets.UTF_8)
|
||
|
|
);
|
||
|
|
|
||
|
|
|
||
|
|
SQLTask sqlTask = new SQLTask();
|
||
|
|
sqlTask.setSQL(encodedSql);
|
||
|
|
|
||
|
|
Task task = new Task();
|
||
|
|
task.setSparkSQLTask(sqlTask);
|
||
|
|
// task.setSQLTask(sqlTask);
|
||
|
|
|
||
|
|
|
||
|
|
CreateTaskRequest request = new CreateTaskRequest();
|
||
|
|
request.setTask(task);
|
||
|
|
// request.setDatasourceConnectionName("gen6_prod_aiag");
|
||
|
|
|
||
|
|
request.setDataEngineName("gen6_prod_aiag");
|
||
|
|
request.setDatabaseName("dws");
|
||
|
|
|
||
|
|
CreateTaskResponse response = dlcClient.CreateTask(request);
|
||
|
|
return response.getTaskId();
|
||
|
|
}
|
||
|
|
|
||
|
|
private TaskResultInfo pollTaskResult(DlcClient dlcClient, String taskId) throws Exception {
|
||
|
|
long startTime = System.currentTimeMillis();
|
||
|
|
|
||
|
|
while (true) {
|
||
|
|
DescribeTaskResultRequest request = new DescribeTaskResultRequest();
|
||
|
|
request.setTaskId(taskId);
|
||
|
|
request.setMaxResults(1000L);
|
||
|
|
|
||
|
|
DescribeTaskResultResponse response = dlcClient.DescribeTaskResult(request);
|
||
|
|
TaskResultInfo taskInfo = response.getTaskInfo();
|
||
|
|
Long state = taskInfo.getState();
|
||
|
|
|
||
|
|
// 2=success
|
||
|
|
if (state != null && state == 2L) {
|
||
|
|
return taskInfo;
|
||
|
|
}
|
||
|
|
// -1=failed, -3=cancelled
|
||
|
|
if (state != null && (state == -1L || state == -3L)) {
|
||
|
|
throw new RuntimeException(
|
||
|
|
"WeData DLC task failed, state=" + state
|
||
|
|
+ ", message=" + taskInfo.getOutputMessage()
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
if (System.currentTimeMillis() - startTime > MAX_WAIT_MS) {
|
||
|
|
throw new RuntimeException(
|
||
|
|
"WeData DLC task timeout after " + MAX_WAIT_MS + "ms, taskId=" + taskId
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
Thread.sleep(POLL_INTERVAL_MS);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
private List<Map<String, Object>> parseResultSet(TaskResultInfo resultInfo) {
|
||
|
|
List<Map<String, Object>> resultList = new ArrayList<>();
|
||
|
|
|
||
|
|
Column[] schema = resultInfo.getResultSchema();
|
||
|
|
String resultSet = resultInfo.getResultSet();
|
||
|
|
|
||
|
|
if (schema == null || resultSet == null || resultSet.isEmpty()) {
|
||
|
|
return resultList;
|
||
|
|
}
|
||
|
|
|
||
|
|
String[] columnNames = new String[schema.length];
|
||
|
|
for (int i = 0; i < schema.length; i++) {
|
||
|
|
columnNames[i] = schema[i].getName();
|
||
|
|
}
|
||
|
|
|
||
|
|
// ResultSet is a JSON string: array of arrays, e.g. [["val1","val2"],["val3","val4"]]
|
||
|
|
resultSet = resultSet.trim();
|
||
|
|
if (!resultSet.startsWith("[")) {
|
||
|
|
return resultList;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Parse the JSON array of arrays manually to avoid extra dependencies
|
||
|
|
// Format: [["v1","v2"],["v3","v4"]]
|
||
|
|
List<List<String>> rows = parseJsonArrayOfArrays(resultSet);
|
||
|
|
for (List<String> row : rows) {
|
||
|
|
Map<String, Object> map = new HashMap<>();
|
||
|
|
for (int i = 0; i < columnNames.length && i < row.size(); i++) {
|
||
|
|
map.put(columnNames[i], row.get(i));
|
||
|
|
}
|
||
|
|
resultList.add(map);
|
||
|
|
}
|
||
|
|
|
||
|
|
return resultList;
|
||
|
|
}
|
||
|
|
|
||
|
|
private List<List<String>> parseJsonArrayOfArrays(String json) {
|
||
|
|
List<List<String>> result = new ArrayList<>();
|
||
|
|
// Use hutool or simple JSON parsing
|
||
|
|
// The SDK returns format: [["a","b"],["c","d"]]
|
||
|
|
try {
|
||
|
|
cn.hutool.json.JSONArray outerArray = new cn.hutool.json.JSONArray(json);
|
||
|
|
for (int i = 0; i < outerArray.size(); i++) {
|
||
|
|
cn.hutool.json.JSONArray innerArray = outerArray.getJSONArray(i);
|
||
|
|
List<String> row = new ArrayList<>();
|
||
|
|
for (int j = 0; j < innerArray.size(); j++) {
|
||
|
|
Object val = innerArray.get(j);
|
||
|
|
row.add(val == null ? null : val.toString());
|
||
|
|
}
|
||
|
|
result.add(row);
|
||
|
|
}
|
||
|
|
} catch (Exception e) {
|
||
|
|
log.warn("Failed to parse DLC result set JSON", e);
|
||
|
|
}
|
||
|
|
return result;
|
||
|
|
}
|
||
|
|
|
||
|
|
private String applyParams(String sql, List<Object> params) {
|
||
|
|
StringBuilder sb = new StringBuilder();
|
||
|
|
int paramIndex = 0;
|
||
|
|
for (int i = 0; i < sql.length(); i++) {
|
||
|
|
char c = sql.charAt(i);
|
||
|
|
if (c == '?' && paramIndex < params.size()) {
|
||
|
|
Object param = params.get(paramIndex++);
|
||
|
|
if (param == null) {
|
||
|
|
sb.append("NULL");
|
||
|
|
} else if (param instanceof Number) {
|
||
|
|
sb.append(param);
|
||
|
|
} else {
|
||
|
|
sb.append('\'').append(param.toString().replace("'", "''")).append('\'');
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
sb.append(c);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return sb.toString();
|
||
|
|
}
|
||
|
|
}
|