Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2015 Google Inc. All rights reserved. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package main |
| 18 | |
| 19 | import ( |
| 20 | "encoding/json" |
| 21 | "flag" |
| 22 | "fmt" |
| 23 | "./third_party/websocket" |
| 24 | "log" |
| 25 | "net/http" |
| 26 | "net/url" |
| 27 | "reflect" |
| 28 | "./cherry" |
| 29 | "./rtdb" |
| 30 | "./service" |
| 31 | "regexp" |
| 32 | "time" |
| 33 | "strconv" |
| 34 | "io" |
| 35 | ) |
| 36 | |
| 37 | var rtdbServer *rtdb.Server |
| 38 | var testRunner *cherry.TestRunner |
| 39 | var rpcServices *service.Library |
| 40 | var objectModel *rtdb.ObjectModel |
| 41 | |
| 42 | // \todo [petri] is there a better place for this? |
| 43 | |
| 44 | func EncodeJSONRPC (method string, params interface{}) ([]byte, error) { |
| 45 | type Message struct { |
| 46 | JsonRPC string `json:"jsonrpc"` |
| 47 | Method string `json:"method"` |
| 48 | Params []interface{} `json:"params"` |
| 49 | } |
| 50 | return json.Marshal(Message{ |
| 51 | JsonRPC: "2.0", |
| 52 | Method: method, |
| 53 | Params: []interface{}{ params }, |
| 54 | }) |
| 55 | } |
| 56 | |
| 57 | func debugStr (s string) string { |
| 58 | if len(s) > 100 { |
| 59 | return s[0:97] + "..." |
| 60 | } else { |
| 61 | return s |
| 62 | } |
| 63 | } |
| 64 | |
| 65 | // Websocket handling. |
| 66 | |
| 67 | func wsHandler (w http.ResponseWriter, r *http.Request) { |
| 68 | conn, err := websocket.Upgrade(w, r, nil, 1024, 1024) |
| 69 | if _, ok := err.(websocket.HandshakeError); ok { |
| 70 | http.Error(w, "Not a websocket handshake", 400) |
| 71 | return |
| 72 | } else if err != nil { |
| 73 | log.Println(err) |
| 74 | return |
| 75 | } |
| 76 | log.Println("[socket] connection established") |
| 77 | defer conn.Close() |
| 78 | |
| 79 | // Channels. |
| 80 | type msgReceived struct { |
| 81 | content []byte |
| 82 | err error |
| 83 | } |
| 84 | |
| 85 | type sendMessage struct { |
| 86 | content []byte |
| 87 | } |
| 88 | |
| 89 | connQueue := make(chan interface{}, 1000) // \note A too small capacity causes deadlock; this is a temporary fix. |
| 90 | |
| 91 | // RTDB listener callbacks. |
| 92 | onSubscribeObject := func(objects []rtdb.Object) { |
| 93 | // Send initial object to client. |
| 94 | log.Printf("[socket] send %d subscribed object to client\n", len(objects)) |
| 95 | encoded, err := EncodeJSONRPC("rtdb.InitObjects", objects) |
| 96 | if err != nil { panic(err) } |
| 97 | connQueue <- sendMessage{ encoded } |
| 98 | } |
| 99 | |
| 100 | onUpdateObjects := func(changes []rtdb.Update) { |
| 101 | // \todo [petri] currently handled inside server thread -- move to client thread instead? |
| 102 | // \todo [petri] we should be sending {id:ops}, but changes instead contains values |
| 103 | getObjIds := func() []string { |
| 104 | names := make([]string, len(changes)) |
| 105 | for ndx, change := range changes { names[ndx] = change.ObjId } |
| 106 | return names |
| 107 | } |
| 108 | log.Printf("[socket] update client objects: %q\n", getObjIds()) |
| 109 | |
| 110 | type UpdateObjects struct { |
| 111 | Changes []rtdb.Update `json:"changes"` |
| 112 | } |
| 113 | |
| 114 | update := UpdateObjects { |
| 115 | Changes: changes, |
| 116 | } |
| 117 | |
| 118 | encoded, err := EncodeJSONRPC("rtdb.UpdateObjects", update) |
| 119 | // \todo [petri] what to do on error?! |
| 120 | if err == nil { |
| 121 | connQueue <- sendMessage{ encoded } |
| 122 | } |
| 123 | } |
| 124 | |
| 125 | // Create client. |
| 126 | listener := rtdbServer.NewListener(onSubscribeObject, onUpdateObjects) |
| 127 | rpcClient := cherry.NewRPCClient(listener) |
| 128 | |
| 129 | // Handle message reading in separate goroutine. |
| 130 | connectionClosed := make(chan struct{}) |
| 131 | go func() { |
| 132 | // Handle incoming messages in this thread. |
| 133 | for { |
| 134 | select { |
| 135 | case <-connectionClosed: |
| 136 | return |
| 137 | default: |
| 138 | _, input, err := conn.ReadMessage() |
| 139 | connQueue <- msgReceived{ input, err } |
| 140 | if err != nil { <-connectionClosed; return } |
| 141 | } |
| 142 | } |
| 143 | }() |
| 144 | |
| 145 | // Handle socket connection. |
| 146 | func() { |
| 147 | for { |
| 148 | op := <- connQueue |
| 149 | switch op.(type) { |
| 150 | case msgReceived: |
| 151 | msg := op.(msgReceived) |
| 152 | if msg.err != nil { |
| 153 | log.Printf("[socket] error reading message: %s\n", msg.err) |
| 154 | return |
| 155 | } |
| 156 | |
| 157 | result, err := rpcServices.CallJSON(rpcClient, msg.content) |
| 158 | if err == nil { |
| 159 | log.Printf("[socket] result: %s\n", debugStr(string(result))) |
| 160 | err = conn.WriteMessage(websocket.TextMessage, result) |
| 161 | if err != nil { log.Printf("[socket] WARNING: unable to write message: %s\n", err); return } |
| 162 | } else { |
| 163 | log.Printf("[socket] WARNING: invalid rpc call '%s': '%s'\n", string(msg.content), err) |
| 164 | // \todo [petri] return error! |
| 165 | } |
| 166 | |
| 167 | case sendMessage: |
| 168 | // Send message to client. |
| 169 | msg := op.(sendMessage) |
| 170 | err = conn.WriteMessage(websocket.TextMessage, msg.content) |
| 171 | if err != nil { log.Printf("[socket] WARNING: unable to write message: %s\n", err); return } |
| 172 | } |
| 173 | } |
| 174 | }() |
| 175 | |
| 176 | connectionClosed <- struct{}{} |
| 177 | |
| 178 | // Cleanup. |
| 179 | log.Printf("[socket] destroy listener\n") |
| 180 | rtdbServer.DestroyListener(listener) |
| 181 | |
| 182 | // Close channel and eat all messages (to avoid blocking senders). |
| 183 | close(connQueue) |
| 184 | for _ = range connQueue {} |
| 185 | |
| 186 | log.Println("[socket] client disconnected") |
| 187 | } |
| 188 | |
| 189 | // Helper for handling GET requests of the form /uriDirectoryName/param . |
| 190 | func handleSingleParamGETRequest (response http.ResponseWriter, request *http.Request, uriDirectoryName string, writeResponse func(param string) error) { |
| 191 | if request.Method != "GET" { |
| 192 | http.Error(response, "Invalid request", 400) |
| 193 | return |
| 194 | } |
| 195 | |
| 196 | uri, err := url.ParseRequestURI(request.RequestURI) |
| 197 | if err != nil { |
| 198 | log.Printf("[getrequest] Error when parsing URI: %v\n", err) |
| 199 | http.Error(response, err.Error(), 404) |
| 200 | return |
| 201 | } |
| 202 | |
| 203 | re, _ := regexp.Compile(fmt.Sprintf("/%s/(.*)", uriDirectoryName)) |
| 204 | param := re.FindStringSubmatch(uri.Path)[1] |
| 205 | |
| 206 | err = writeResponse(param) |
| 207 | if err != nil { |
| 208 | log.Printf("[getrequest] Finished with error: %v\n", err) |
| 209 | http.Error(response, err.Error(), 404) |
| 210 | return |
| 211 | } |
| 212 | log.Printf("[getrequest] Finished successfully\n") |
| 213 | } |
| 214 | |
| 215 | // Batch execution log export request handler. |
| 216 | func executionLogExportHandler (response http.ResponseWriter, request *http.Request) { |
| 217 | handleSingleParamGETRequest(response, request, "executionLog", func(batchResultId string) error { |
| 218 | log.Printf("[execlog] Handling request for batch '%s'\n", batchResultId) |
| 219 | str, err := testRunner.QueryBatchExecutionLog(batchResultId) |
| 220 | if err != nil { return err } |
| 221 | _, err = io.WriteString(response, str) |
| 222 | return err |
| 223 | }) |
| 224 | } |
| 225 | |
| 226 | type batchResultExportHttpResponse struct { |
| 227 | response http.ResponseWriter |
| 228 | } |
| 229 | |
| 230 | func (w batchResultExportHttpResponse) Write (p []byte) (int, error) { |
| 231 | return w.response.Write(p) |
| 232 | } |
| 233 | |
| 234 | func (w batchResultExportHttpResponse) SetFilename (name string) { |
| 235 | w.response.Header().Add("Content-Disposition", "attachment; filename=" + strconv.Quote(name)) |
| 236 | } |
| 237 | |
| 238 | // QPA export request handler. |
| 239 | func exportHandler (response http.ResponseWriter, request *http.Request) { |
| 240 | handleSingleParamGETRequest(response, request, "export", func(batchResultId string) error { |
| 241 | log.Printf("[export] Handling request for batch '%s'\n", batchResultId) |
| 242 | return cherry.WriteBatchResultExport(batchResultExportHttpResponse{response}, rtdbServer, batchResultId) |
| 243 | }) |
| 244 | } |
| 245 | |
| 246 | // QPA import request handler. |
| 247 | func importHandler (response http.ResponseWriter, request *http.Request) { |
| 248 | if request.Method != "POST" || request.RequestURI != "/import/" { |
| 249 | http.Error(response, "Invalid request", 400) |
| 250 | return |
| 251 | } |
| 252 | |
Jarkko Pöyry | d17ce40 | 2015-04-07 13:31:45 -0700 | [diff] [blame] | 253 | parts, err := request.MultipartReader() |
| 254 | if err != nil { |
| 255 | http.Error(response, "Expected a multipart upload", 400) |
| 256 | return |
| 257 | } |
| 258 | |
Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 259 | log.Printf("[import] Received request with Content-Length %d\n", request.ContentLength) |
Jarkko Pöyry | d17ce40 | 2015-04-07 13:31:45 -0700 | [diff] [blame] | 260 | |
| 261 | anyImportSucceeded := false |
| 262 | anyImportFailed := false |
| 263 | |
| 264 | for { |
| 265 | file, err := parts.NextPart(); |
| 266 | if err != nil { |
| 267 | break |
| 268 | } |
| 269 | |
| 270 | startTime := time.Now() |
Jarkko Pöyry | 62f5c2d | 2015-04-07 16:15:42 -0700 | [diff] [blame] | 271 | batchResultDefaultName := "Import-" + startTime.Format("2006-Jan-02 15:04:05") |
| 272 | err = testRunner.ImportBatch(batchResultDefaultName, file, request.ContentLength) |
Jarkko Pöyry | d17ce40 | 2015-04-07 13:31:45 -0700 | [diff] [blame] | 273 | if err != nil { |
| 274 | log.Printf("[import] Import failed with error %v\n", err) |
| 275 | anyImportFailed = true |
| 276 | } else { |
| 277 | log.Printf("[import] Single import finished\n") |
| 278 | anyImportSucceeded = true |
| 279 | } |
| 280 | |
| 281 | file.Close() |
| 282 | } |
| 283 | |
| 284 | request.Body.Close() |
| 285 | |
| 286 | if !anyImportFailed { |
| 287 | // no failures |
| 288 | response.WriteHeader(http.StatusOK) |
| 289 | } else if !anyImportSucceeded { |
| 290 | // no successes |
| 291 | http.Error(response, "Import failed", 500) |
| 292 | } else { |
| 293 | // both failures and successes |
| 294 | http.Error(response, "Partial failure", 207) |
| 295 | } |
Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 296 | } |
| 297 | |
| 298 | // Mapping of third party locations to desired server locations |
| 299 | // This allows us to remap the locations for release packages or when versions change |
| 300 | func setUpFileMappings() { |
| 301 | type ServerFileMapping struct { |
| 302 | SourceRootDir string |
| 303 | ServerPrefix string |
| 304 | } |
| 305 | |
| 306 | serverFileMappings := []ServerFileMapping{ |
| 307 | {"third_party/ui-bootstrap", "/ui-bootstrap/"}, |
| 308 | {"third_party/angular", "/lib/angular/"}, |
Kalle Raita | c74eb2e | 2015-05-27 10:31:53 -0700 | [diff] [blame] | 309 | {"third_party/ui-router", "/lib/ui-router/"}, |
Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 310 | {"third_party/jquery", "/lib/jquery/"}, |
| 311 | {"third_party/spin", "/lib/spin/"}, |
| 312 | {"third_party/angular-spinner", "/lib/angular-spinner/"}, |
| 313 | {"third_party/bootstrap", "/lib/bootstrap/"}, |
Kalle Raita | c74eb2e | 2015-05-27 10:31:53 -0700 | [diff] [blame] | 314 | {"third_party/sax", "/lib/sax/"}, |
Kalle Raita | 70b5019 | 2015-01-22 14:01:08 -0800 | [diff] [blame] | 315 | {"third_party/underscore", "/lib/underscore/"}, |
| 316 | {"third_party/angular-tree-control", "/lib/angular-tree-control/"}} |
Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 317 | |
Kalle Raita | c74eb2e | 2015-05-27 10:31:53 -0700 | [diff] [blame] | 318 | // Special case the main license |
| 319 | http.HandleFunc("/LICENSE", func(w http.ResponseWriter, r *http.Request) { |
| 320 | http.ServeFile(w, r, "LICENSE") |
| 321 | }) |
| 322 | |
| 323 | // The client hierarchy served as-is |
| 324 | fs := http.Dir("client") |
| 325 | fileHandler := http.FileServer(fs) |
| 326 | http.Handle("/", fileHandler) |
| 327 | |
| 328 | // Re-map third-party to point to the listed directories |
Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 329 | for _, mapping := range serverFileMappings { |
| 330 | bootStrapFs := http.Dir(mapping.SourceRootDir) |
| 331 | bootStrapFileHandler := http.StripPrefix(mapping.ServerPrefix, http.FileServer(bootStrapFs)) |
| 332 | http.Handle(mapping.ServerPrefix, bootStrapFileHandler) |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | // Main |
| 337 | |
| 338 | func main () { |
| 339 | port := flag.Int("port", 8080, "port to serve on") |
| 340 | dbName := flag.String("db", "Cherry.db", "database file name (use :memory: for in-memory)") |
| 341 | // \todo [2014-10-02 pyry] Temporary workaround to make cherry usable on non-SSD systems. |
| 342 | // Real fix is to write better DB backend. |
| 343 | dbSyncIo := flag.Bool("db-sync-io", true, "use synchronous IO in DB backend") |
| 344 | flag.Parse() |
| 345 | |
| 346 | // Initialize Cherry object model. |
| 347 | objectModel := rtdb.NewObjectModel(cherry.GetObjectTypes()) |
| 348 | |
| 349 | // Create RTDB instance. |
| 350 | log.Println("[main] create sql backend") |
| 351 | rtdbBackend, err := rtdb.NewSQLiteBackend(*dbName, *dbSyncIo) |
| 352 | if err != nil { log.Println(err); return } |
| 353 | log.Println("[main] create rtdb") |
| 354 | rtdbServer, err = rtdb.NewServer(objectModel, rtdbBackend) |
| 355 | if err != nil { log.Println(err); return } |
| 356 | |
| 357 | // Initialize database data model. |
| 358 | cherry.InitDB(rtdbServer) |
| 359 | |
| 360 | cherry.StartADBDeviceListPoller(rtdbServer, 2 * time.Second) |
| 361 | |
| 362 | // Initialize RPC services. |
| 363 | testRunner = cherry.NewTestRunner(rtdbServer) |
| 364 | rpcHandler := cherry.NewRPCHandler(rtdbServer, testRunner) |
| 365 | rpcServices = service.NewServiceLibrary( |
| 366 | reflect.TypeOf((*cherry.RPCClient)(nil)), |
| 367 | []service.HandlerSpec{ |
| 368 | service.HandlerSpec{ "rtdb", rpcHandler }, |
| 369 | }) |
| 370 | |
| 371 | // Setup http handling. |
Kalle Raita | 30b8a87 | 2014-04-03 11:22:31 +0300 | [diff] [blame] | 372 | setUpFileMappings() |
| 373 | |
| 374 | http.HandleFunc("/ws", wsHandler) |
| 375 | http.HandleFunc("/executionLog/", executionLogExportHandler) |
| 376 | http.HandleFunc("/export/", exportHandler) |
| 377 | http.HandleFunc("/import/", importHandler) |
| 378 | |
| 379 | // Fire up http server! |
| 380 | addr := fmt.Sprintf("127.0.0.1:%d", *port) |
| 381 | log.Printf("Listening on port %d\n", *port) |
| 382 | err = http.ListenAndServe(addr, nil) |
| 383 | log.Println(err.Error()) |
| 384 | } |