1: <?php
2: /**
3: * Fluent-PHP-Logger
4: *
5: * Copyright (C) 2011 Shuhei Tanuma
6: *
7: * Licensed under the Apache License, Version 2.0 (the "License");
8: * you may not use this file except in compliance with the License.
9: * You may obtain a copy of the License at
10: *
11: * http://www.apache.org/licenses/LICENSE-2.0
12: *
13: * Unless required by applicable law or agreed to in writing, software
14: * distributed under the License is distributed on an "AS IS" BASIS,
15: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16: * See the License for the specific language governing permissions and
17: * limitations under the License.
18: */
19: namespace Fluent\Logger;
20:
21: /**
22: * Fluent Logger
23: *
24: * Fluent Logger client communicates to Fluentd with MessagePack formatted messages.
25: *
26: * @author Shuhei Tanuma <stanuma@zynga.com>
27: */
28: class FluentLogger extends BaseLogger
29: {
30: const CONNECTION_TIMEOUT = 3;
31: const SOCKET_TIMEOUT = 3;
32: const MAX_WRITE_RETRY = 10;
33: /* Fluent uses port 24224 as a default port */
34: const DEFAULT_LISTEN_PORT = 24224;
35:
36: const DEFAULT_ADDRESS = "127.0.0.1";
37:
38: protected $tag;
39: protected $host;
40: protected $port;
41: protected $socket;
42:
43: /**
44: * create fluent logger object.
45: *
46: * @param string $tag primary tag
47: * @param string $host
48: * @param int $port
49: * @return FluentLogger
50: */
51: public function __construct($tag, $host = FluentLogger::DEFAULT_ADDRESS, $port = FluentLogger::DEFAULT_LISTEN_PORT)
52: {
53: $this->tag = $tag;
54: $this->host = $host;
55: $this->port = $port;
56: }
57:
58: /**
59: * Fluent singleton API.
60: *
61: * @todo fixed singleton api.
62: * @param string $tag primary tag
63: * @param string $host
64: * @param int $port
65: * @return FluentLogger created logger object.
66: */
67: public static function open($tag, $host = FluentLogger::DEFAULT_ADDRESS, $port = FluentLogger::DEFAULT_LISTEN_PORT)
68: {
69: $logger = new self($tag,$host,$port);
70: //\Fluent::$logger = $logger;
71: return $logger;
72: }
73:
74: /**
75: * create a connection to specified fluentd
76: *
77: * @return void
78: */
79: protected function connect()
80: {
81: // could not suppress warning without ini setting.
82: // for now, we use error control operators.
83: $socket = @stream_socket_client(sprintf("tcp://%s:%d",$this->host,$this->port),$errno,$errstr,
84: self::CONNECTION_TIMEOUT,\STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT);
85: if (!$socket) {
86: $errors = error_get_last();
87: throw new \Exception($errors['message']);
88: }
89: // set read / write timeout.
90: stream_set_timeout($socket,self::SOCKET_TIMEOUT);
91: $this->socket = $socket;
92: }
93:
94: /**
95: * create a connection if Fluent Logger hasn't a socket connection.
96: *
97: * @return void
98: */
99: protected function reconnect()
100: {
101: if (!is_resource($this->socket)) {
102: $this->connect();
103: }
104: }
105:
106: /**
107: * send a message to specified fluentd.
108: *
109: * @param mixed $data
110: */
111: public function post($data, $additional = null)
112: {
113: $packed = self::pack_impl($this->tag,$data,$additional);
114: $data = $packed;
115: $length = strlen($packed);
116: $retry = $written = 0;
117:
118: $this->reconnect();
119:
120: try {
121: // PHP socket looks weired. we have to check the implementation.
122: while ($written < $length) {
123: $nwrite = $this->write($data);
124: if ($nwrite === false) {
125: // could not write messages to the socket.
126: // Todo: check fwrite implementation.
127: throw new \Exception("could not write message");
128: } else if ($nwrite === "") {
129: // sometimes fwrite returns null string.
130: // probably connection aborted.
131: throw new \Exception("connection aborted");
132: } else if ($nwrite === 0) {
133: if ($retry > self::MAX_WRITE_RETRY) {
134: throw new \Exception("failed fwrite retry: max retry count");
135: }
136: $retry++;
137: }
138: $written += $nwrite;
139: $data = substr($packed,$written);
140: }
141: } catch (\Exception $e) {
142: error_log($e->getMessage());
143: return false;
144: }
145:
146: return true;
147: }
148:
149: /**
150: * pack php object to fluentd message format.
151: * fluentd v0.9.20 can read json message format directly.
152: * for now, this method send message to fluentd as json object.
153: *
154: * @param string $tag fluentd tag.
155: * @param mixed $data
156: * @param string $additional optional tag.
157: * @return string message data.
158: */
159: public static function pack_impl($tag, $data, $additional = null)
160: {
161: if (!empty($additional)) {
162: $tag .= "." . $additional;
163: }
164: return json_encode(array($tag, time(), $data));
165: }
166:
167: /**
168: * remove socket resource.
169: *
170: * @return void
171: */
172: public function __destruct()
173: {
174: }
175:
176: /**
177: * write data
178: *
179: * @param string $data
180: * @return mixied integer|false
181: */
182: protected function write($data)
183: {
184: return fwrite($this->socket, $data);
185: }
186: }
187: