代码语言: Go
1func CreateClient() *sqs.SQS {
2 ses, _ := session.NewSession(&aws.Config{
3 Region: aws.String("cn-north-1"),
4 Credentials: credentials.NewStaticCredentials("your AccessKey",
5 "your SecretKey", ""),
6 Endpoint: aws.String("http://jqs.cn-north-1.jdcloud.com"),
7 DisableSSL: aws.Bool(true),
8 })
9 _, err := ses.Config.Credentials.Get()
10 if err != nil {
11 log.Fatal("凭据创建失败", err)
12 }
13 client := sqs.New(ses)
14 return client
15}
16
17func CreateQueueTask(name string) string {
18 resp, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{
19 QueueName: aws.String(name),
20 })
21 if err != nil {
22 log.Println("Create Queue Failed:", err)
23 return ""
24 }
25 return *resp.QueueUrl
26}
1func SendTask(url string, message interface{}) {
2 body, _ := json.Marshal(message)
3 _, err := sqsClient.SendMessage(&sqs.SendMessageInput{
4 MessageBody: aws.String(string(body)),
5 QueueUrl: aws.String(url),
6 })
7 if err != nil {
8 log.Println("Send Message Failed:", err)
9 return
10 }
11 return
12}
1func ReceiveMessageTask(url string) interface{} {
2 result, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
3 AttributeNames: aws.StringSlice([]string{"All"}),
4 MaxNumberOfMessages: aws.Int64(1),
5 MessageAttributeNames: aws.StringSlice([]string{"All"}),
6 QueueUrl: aws.String(url),
7 VisibilityTimeout: aws.Int64(30),
8 WaitTimeSeconds: aws.Int64(0),
9 })
10 log.Println(*result.Messages[0].Body)
11 if err != nil {
12 log.Println("Receive Message Failed:", err)
13 return ""
14 }
15
16 if len(result.Messages) > 0 {
17 _, delErr := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{
18 QueueUrl: aws.String(url),
19 ReceiptHandle: result.Messages[0].ReceiptHandle,
20 })
21 if err != nil {
22 log.Println("Delete Message Failed:", delErr)
23 }
24
25 message := new(gpsMessage)
26 Err := json.Unmarshal([]byte(*result.Messages[0].Body), message)
27 if Err != nil {
28 log.Println("Receive Message Unmarshal Failed", Err)
29 return ""
30 }
31 return message
32 }
33 return ""
34}
35
36func Build(url string, method string, body interface{}) []byte {
37 client := &http.Client{}
38 //向服务端发送get请求
39 bodyData, _ := json.Marshal(body)
40 request, _ := http.NewRequest(method, url, bytes.NewReader(bodyData))
41
42 request.Header.Set("Accept", "application/json")
43 request.Header.Set("Content-Type", "application/json")
44 //接收服务端返回给客户端的信息
45 response, _ := client.Do(request)
46 r, _ := ioutil.ReadAll(response.Body)
47 return r
48}
49
50func PostMessageToES(p string, body interface{}) string {
51 postReturn := new(postRes)
52 postResponse := Build(p, "POST", body)
53 err := json.Unmarshal(postResponse, postReturn)
54 if err != nil {
55 log.Println("Unmarshal Failed", err)
56 }
57 jsonS, _ := json.Marshal(postReturn)
58 log.Println("postResult:", string(jsonS))
59 return postReturn.Id
60}
1func GetMessageFromES(p string) {
2 message := new(esMessage)
3 getResponse := Build(p, "GET", nil)
4 log.Println("getPath:", p)
5 Err := json.Unmarshal(getResponse, message)
6 if Err != nil {
7 log.Println("Unmarshal Failed", Err)
8 }
9 jsonM, _ := json.Marshal(message)
10 log.Println("getResult:", string(jsonM))
11}
结果示例:
京东云的队列服务作为Serverless开发中的BaaS服务,实现了中间件服务的无运维和毫秒级扩缩能力,支持京东云的合作伙伴零成本启动业务和按使用量付费的模式,帮助用户解决资源扩缩和阈值监控等复杂问题。结合函数服务FaaS使用,可以满足更丰富的场景,并且调用整个京东云Serverless生态,打造基于云原生21世纪的开放式的全新应用。点击“了解”,快进行体验吧!