开发者中心 > 专栏 > 内容详情
分享
  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

利用京东云Serverless服务快速构建5G时代的IoT应用

  • 京东云Serverless团队
  • 2020-01-14
10月31日,在2019年中国国际信息通信展览会上,工信部宣布:5G商用正式启动。5G商用时代来了!
5G的商用,使得数据传输速度、响应速度、连接数据、数据传输量、传输可靠性等方面都有了显著的提升,这一技术的突破才使得很多领域的应用场景得以真正的落地实施,走进普通人的生活,而这其中就包括物联网。
虽然物联网的概念很简明,就是把物品与互联网相连接并进行信息交换和通讯,从而实现万物智能化,但是由于各种原因,商业化的应用并没有大规模的落地,其中一方面是由于网络传输等原因的限制。如今,5G的到来使得物联网应用在网络层面不再受限,更好地促进了物联网的发展。对于企业来说,快速构建一个商用的物联网服务,抢占先机就显得尤为重要。
京东云Serverless服务正适应了如今企业的这种需求。Serverless一系列产品生态可以让用户以业务为导向,无需关心底层服务器部署以及承载能力,实施周期短,无预付价格按使用量付费。这些特性是适应互联网5G快速时代、快速构建的不二选择,为企业节省成本的同时解决了技术上的难题。
以下面一个车联网数据为例子,我们来看一下如何用队列服务来构建一个高可用高可靠的无服务应用。在车联网应用中客户端负载可能会在24小时内扩展/缩减3、4个数量级,这些特性天然适合Serverless服务动态扩缩,按量付费。

案例场景:车联网中搭载数据收集传感器的汽车向云端传输行驶数据,云端利用队列服务削峰填谷、动态扩缩的能力接收数据,然后转存到Elasticsearch进行更好的数据检索和应用,为用户提供更好的服务或者为公司提供更多的商业价值。
需求:队列服务SDK,队列服务接入点地址,Elasticsearch接入点地址(已经创建好ES实例),京东云用户AK/SK

代码语言: Go

Step1
创建队列服务客户端以及资源创建
1func CreateClient() *sqs.SQS {
2    ses, _ := session.NewSession(&aws.Config{
3        Region: aws.String("cn-north-1"),
4        Credentials: credentials.NewStaticCredentials("your AccessKey",
5            "your SecretKey"""),
6        Endpoint:   aws.String("http://jqs.cn-north-1.jdcloud.com"),
7        DisableSSL: aws.Bool(true),
8    })
9    _, err := ses.Config.Credentials.Get()
10    if err != nil {
11        log.Fatal("凭据创建失败", err)
12    }
13    client := sqs.New(ses)
14    return client
15}
16
17func CreateQueueTask(name string) string {
18    resp, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{
19        QueueName: aws.String(name),
20    })
21    if err != nil {
22        log.Println("Create Queue Failed:", err)
23        return ""
24    }
25    return *resp.QueueUrl
26}
Step2
每个车辆设备发送消息
1func SendTask(url string, message interface{}) {
2    body, _ := json.Marshal(message)
3    _, err := sqsClient.SendMessage(&sqs.SendMessageInput{
4        MessageBody: aws.String(string(body)),
5        QueueUrl:    aws.String(url),
6    })
7    if err != nil {
8        log.Println("Send Message Failed:", err)
9        return
10    }
11    return
12}
测试数据:
1.jpg
测试机器的配置:CPU64、内存256G、带宽100Mbps(京东云云主机)
场景:公网;send-单条(JQS)
Step3
从队列服务中拉取消息转存到Elasticsearch中
1func ReceiveMessageTask(url string) interface{} {
2    result, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
3        AttributeNames:        aws.StringSlice([]string{"All"}),
4        MaxNumberOfMessages:   aws.Int64(1),
5        MessageAttributeNames: aws.StringSlice([]string{"All"}),
6        QueueUrl:              aws.String(url),
7        VisibilityTimeout:     aws.Int64(30),
8        WaitTimeSeconds:       aws.Int64(0),
9    })
10    log.Println(*result.Messages[0].Body)
11    if err != nil {
12        log.Println("Receive Message Failed:", err)
13        return ""
14    }
15
16    if len(result.Messages) > 0 {
17        _, delErr := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{
18            QueueUrl:      aws.String(url),
19            ReceiptHandle: result.Messages[0].ReceiptHandle,
20        })
21        if err != nil {
22            log.Println("Delete Message Failed:", delErr)
23        }
24
25        message := new(gpsMessage)
26        Err := json.Unmarshal([]byte(*result.Messages[0].Body), message)
27        if Err != nil {
28            log.Println("Receive Message Unmarshal Failed", Err)
29            return ""
30        }
31        return message
32    }
33    return ""
34}
35
36func Build(url string, method string, body interface{}) []byte {
37    client := &http.Client{}
38    //向服务端发送get请求
39    bodyData, _ := json.Marshal(body)
40    request, _ := http.NewRequest(method, url, bytes.NewReader(bodyData))
41
42    request.Header.Set("Accept""application/json")
43    request.Header.Set("Content-Type""application/json")
44    //接收服务端返回给客户端的信息
45    response, _ := client.Do(request)
46    r, _ := ioutil.ReadAll(response.Body)
47    return r
48}
49
50func PostMessageToES(p string, body interface{}) string {
51    postReturn := new(postRes)
52    postResponse := Build(p, "POST", body)
53    err := json.Unmarshal(postResponse, postReturn)
54    if err != nil {
55        log.Println("Unmarshal Failed", err)
56    }
57    jsonS, _ := json.Marshal(postReturn)
58    log.Println("postResult:"string(jsonS))
59    return postReturn.Id
60}
Step4
从Elasticsearch按照需求索引搜索信息
1func GetMessageFromES(p string) {
2    message := new(esMessage)
3    getResponse := Build(p, "GET"nil)
4    log.Println("getPath:", p)
5    Err := json.Unmarshal(getResponse, message)
6    if Err != nil {
7        log.Println("Unmarshal Failed", Err)
8    }
9    jsonM, _ := json.Marshal(message)
10    log.Println("getResult:"string(jsonM))
11}

结果示例:

1.jpg

可以按照车辆信息进行数据的检索,绘制出汽车动态地图,利用这些数据来帮助自动驾驶、动态导航、车辆健康度分析等多种场景进行实现。

京东云的队列服务作为Serverless开发中的BaaS服务,实现了中间件服务的无运维和毫秒级扩缩能力,支持京东云的合作伙伴零成本启动业务和按使用量付费的模式,帮助用户解决资源扩缩和阈值监控等复杂问题。结合函数服务FaaS使用,可以满足更丰富的场景,并且调用整个京东云Serverless生态,打造基于云原生21世纪的开放式的全新应用。点击“了解”,快进行体验吧!


京东云Serverless团队

相关产品

  • 队列服务
    基于serverless架构的全托管消息队列服务