44 #include "D4StreamMarshaller.h" 45 #ifdef USE_POSIX_THREADS 46 #include "MarshallerThread.h" 49 #if USE_XDR_FOR_IEEE754_ENCODING 65 inline uint8_t* WriteVarint64ToArrayInline(uint64_t value, uint8_t* target) {
68 uint32_t part0 =
static_cast<uint32_t
>(value );
69 uint32_t part1 =
static_cast<uint32_t
>(value >> 28);
70 uint32_t part2 =
static_cast<uint32_t
>(value >> 56);
83 if (part0 < (1 << 14)) {
84 if (part0 < (1 << 7)) {
90 if (part0 < (1 << 21)) {
97 if (part1 < (1 << 14)) {
98 if (part1 < (1 << 7)) {
101 size = 6;
goto size6;
104 if (part1 < (1 << 21)) {
105 size = 7;
goto size7;
107 size = 8;
goto size8;
112 if (part2 < (1 << 7)) {
113 size = 9;
goto size9;
115 size = 10;
goto size10;
121 size10: target[9] =
static_cast<uint8_t
>((part2 >> 7) | 0x80);
122 size9 : target[8] =
static_cast<uint8_t
>((part2 ) | 0x80);
123 size8 : target[7] =
static_cast<uint8_t
>((part1 >> 21) | 0x80);
124 size7 : target[6] =
static_cast<uint8_t
>((part1 >> 14) | 0x80);
125 size6 : target[5] =
static_cast<uint8_t
>((part1 >> 7) | 0x80);
126 size5 : target[4] =
static_cast<uint8_t
>((part1 ) | 0x80);
127 size4 : target[3] =
static_cast<uint8_t
>((part0 >> 21) | 0x80);
128 size3 : target[2] =
static_cast<uint8_t
>((part0 >> 14) | 0x80);
129 size2 : target[1] =
static_cast<uint8_t
>((part0 >> 7) | 0x80);
130 size1 : target[0] =
static_cast<uint8_t
>((part0 ) | 0x80);
132 target[size-1] &= 0x7F;
133 return target + size;
137 #if USE_XDR_FOR_IEEE754_ENCODING 138 void D4StreamMarshaller::m_serialize_reals(
char *val,
unsigned int num,
int width, Type type)
140 dods_uint64 size = num * width;
142 char *buf =
new char[size];
144 xdrmem_create(&xdr, &buf[0], size, XDR_ENCODE);
146 if(!xdr_array(&xdr, &val, (
unsigned int *)&num, size, width, XDRUtils::xdr_coder(type)))
147 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
149 if (xdr_getpos(&xdr) != size)
150 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 array");
156 dods_float32 *lbuf =
reinterpret_cast<dods_float32*
>(&buf[0]);
158 dods_int32 *i =
reinterpret_cast<dods_int32*
>(lbuf++);
163 dods_float64 *lbuf =
reinterpret_cast<dods_float64*
>(&buf[0]);
165 dods_int64 *i =
reinterpret_cast<dods_int64*
>(lbuf++);
170 #ifdef USE_POSIX_THREADS 171 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
173 tm->increment_child_thread_count();
174 tm->start_thread(MarshallerThread::write_thread, d_out, buf, size);
179 d_out.write(&buf[0], size);
200 D4StreamMarshaller::D4StreamMarshaller(ostream &out,
bool write_data) :
201 d_out(out), d_write_data(write_data), tm(0)
203 assert(
sizeof(std::streamsize) >=
sizeof(int64_t));
205 #if USE_XDR_FOR_IEEE754_ENCODING 209 xdrmem_create(&d_scalar_sink, d_ieee754_buf,
sizeof(dods_float64), XDR_ENCODE);
212 #ifdef USE_POSIX_THREADS 218 out.exceptions(ostream::failbit | ostream::badbit);
221 D4StreamMarshaller::~D4StreamMarshaller()
223 #if USE_XDR_FOR_IEEE754_ENCODING 224 xdr_destroy(&d_scalar_sink);
250 oss.setf(ios::hex, ios::basefield);
251 oss << setfill(
'0') << setw(8) << d_checksum.
GetCrc32();
264 Crc32::checksum chk = d_checksum.
GetCrc32();
265 #ifdef USE_POSIX_THREADS 266 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
268 d_out.write(reinterpret_cast<char*>(&chk),
sizeof(Crc32::checksum));
277 d_checksum.
AddData(reinterpret_cast<const uint8_t*>(data), len);
280 void D4StreamMarshaller::put_byte(dods_byte val)
285 DBG( std::cerr <<
"put_byte: " << val << std::endl );
286 #ifdef USE_POSIX_THREADS 288 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
290 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_byte));
294 void D4StreamMarshaller::put_int8(dods_int8 val)
299 DBG( std::cerr <<
"put_int8: " << val << std::endl );
300 #ifdef USE_POSIX_THREADS 301 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
303 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_int8));
307 void D4StreamMarshaller::put_int16(dods_int16 val)
312 #ifdef USE_POSIX_THREADS 313 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
315 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_int16));
319 void D4StreamMarshaller::put_int32(dods_int32 val)
324 #ifdef USE_POSIX_THREADS 325 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
327 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_int32));
331 void D4StreamMarshaller::put_int64(dods_int64 val)
336 #ifdef USE_POSIX_THREADS 337 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
339 d_out.write(reinterpret_cast<const char*>(&val),
sizeof(dods_int64));
343 void D4StreamMarshaller::put_float32(dods_float32 val)
345 #if !USE_XDR_FOR_IEEE754_ENCODING 346 assert(std::numeric_limits<float>::is_iec559);
351 #ifdef USE_POSIX_THREADS 352 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
354 d_out.write(reinterpret_cast<const char*>(&val),
sizeof(dods_float32));
364 if (std::numeric_limits<float>::is_iec559 ) {
365 #ifdef USE_POSIX_THREADS 366 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
368 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_float32));
371 if (!xdr_setpos(&d_scalar_sink, 0))
372 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
374 if (!xdr_float(&d_scalar_sink, &val))
375 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
377 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float32))
378 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float32 variable");
383 dods_int32 *i =
reinterpret_cast<dods_int32*
>(&d_ieee754_buf);
386 #ifdef USE_POSIX_THREADS 387 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
389 d_out.write(d_ieee754_buf,
sizeof(dods_float32));
395 void D4StreamMarshaller::put_float64(dods_float64 val)
397 #if !USE_XDR_FOR_IEEE754_ENCODING 398 assert(std::numeric_limits<double>::is_iec559);
403 #ifdef USE_POSIX_THREADS 404 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
406 d_out.write(reinterpret_cast<const char*>(&val),
sizeof(dods_float64));
412 if (std::numeric_limits<double>::is_iec559) {
413 #ifdef USE_POSIX_THREADS 414 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
416 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_float64));}
419 if (!xdr_setpos(&d_scalar_sink, 0))
420 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
422 if (!xdr_double(&d_scalar_sink, &val))
423 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
425 if (xdr_getpos(&d_scalar_sink) !=
sizeof(dods_float64))
426 throw InternalErr(__FILE__, __LINE__,
"Error serializing a Float64 variable");
431 dods_int64 *i =
reinterpret_cast<dods_int64*
>(&d_ieee754_buf);
435 #ifdef USE_POSIX_THREADS 436 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
438 d_out.write(d_ieee754_buf,
sizeof(dods_float64));
444 void D4StreamMarshaller::put_uint16(dods_uint16 val)
449 #ifdef USE_POSIX_THREADS 450 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
452 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_uint16));
456 void D4StreamMarshaller::put_uint32(dods_uint32 val)
461 #ifdef USE_POSIX_THREADS 462 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
464 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_uint32));
468 void D4StreamMarshaller::put_uint64(dods_uint64 val)
473 #ifdef USE_POSIX_THREADS 474 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
476 d_out.write(reinterpret_cast<char*>(&val),
sizeof(dods_uint64));
490 #ifdef USE_POSIX_THREADS 491 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
493 d_out.write(reinterpret_cast<const char*>(&count),
sizeof(int64_t));
496 void D4StreamMarshaller::put_str(
const string &val)
501 int64_t len = val.length();
502 #ifdef USE_POSIX_THREADS 503 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
505 d_out.write(reinterpret_cast<const char*>(&len),
sizeof(int64_t));
506 d_out.write(val.data(), val.length());
510 void D4StreamMarshaller::put_url(
const string &val)
515 void D4StreamMarshaller::put_opaque_dap4(
const char *val, int64_t len)
523 #ifdef USE_POSIX_THREADS 524 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
526 d_out.write(reinterpret_cast<const char*>(&len),
sizeof(int64_t));
528 char *byte_buf =
new char[len];
529 memcpy(byte_buf, val, len);
531 tm->increment_child_thread_count();
534 d_out.write(reinterpret_cast<const char*>(&len),
sizeof(int64_t));
535 d_out.write(val, len);
548 assert(num_bytes >= 0);
553 #ifdef USE_POSIX_THREADS 554 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
556 char *buf =
new char[num_bytes];
557 memcpy(buf, val, num_bytes);
559 tm->increment_child_thread_count();
562 d_out.write(val, num_bytes);
570 assert(num_elem >= 0);
571 assert(elem_size > 0);
577 assert(!
"Don't call this method for bytes, use put_vector(val, bytes) instead");
582 assert(!(num_elem & 0x4000000000000000));
583 bytes = num_elem << 1;
586 assert(!(num_elem & 0x6000000000000000));
587 bytes = num_elem << 2;
590 assert(!(num_elem & 0x7000000000000000));
591 bytes = num_elem << 3;
594 bytes = num_elem * elem_size;
601 #ifdef USE_POSIX_THREADS 602 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
604 char *buf =
new char[bytes];
605 memcpy(buf, val, bytes);
607 tm->increment_child_thread_count();
610 d_out.write(val, bytes);
626 #if !USE_XDR_FOR_IEEE754_ENCODING 628 assert(std::numeric_limits<float>::is_iec559);
630 assert(num_elem >= 0);
635 assert(!(num_elem & 0xe000000000000000));
637 num_elem = num_elem << 2;
642 #ifdef USE_POSIX_THREADS 643 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
645 char *buf =
new char[num_elem];
646 memcpy(buf, val, num_elem);
648 tm->increment_child_thread_count();
651 d_out.write(val, num_elem);
657 assert(num_elem >= 0);
662 assert(!(num_elem & 0xe000000000000000));
664 int64_t bytes = num_elem << 2;
669 if (!std::numeric_limits<float>::is_iec559) {
671 m_serialize_reals(val, num_elem, 4, type);
674 #ifdef USE_POSIX_THREADS 675 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
677 char *buf =
new char[bytes];
678 memcpy(buf, val, bytes);
680 tm->increment_child_thread_count();
683 d_out.write(val, bytes);
700 #if !USE_XDR_FOR_IEEE754_ENCODING 702 assert(std::numeric_limits<double>::is_iec559);
704 assert(num_elem >= 0);
706 assert(!(num_elem & 0xf000000000000000));
708 num_elem = num_elem << 3;
713 #ifdef USE_POSIX_THREADS 714 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
716 char *buf =
new char[num_elem];
717 memcpy(buf, val, num_elem);
719 tm->increment_child_thread_count();
722 d_out.write(val, num_elem);
727 assert(num_elem >= 0);
732 assert(!(num_elem & 0xe000000000000000));
734 int64_t bytes = num_elem << 3;
739 if (!std::numeric_limits<double>::is_iec559) {
741 m_serialize_reals(val, num_elem, 8, type);
744 #ifdef USE_POSIX_THREADS 745 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
747 char *buf =
new char[bytes];
748 memcpy(buf, val, bytes);
750 tm->increment_child_thread_count();
753 d_out.write(val, bytes);
801 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
804 throw InternalErr(__FILE__, __LINE__,
"Array of Array not allowed.");
807 case dods_structure_c:
808 case dods_sequence_c:
809 throw InternalErr(__FILE__, __LINE__,
"Array of String should not be passed to put_vector.");
812 throw InternalErr(__FILE__, __LINE__,
"Grid is not part of DAP4.");
815 throw InternalErr(__FILE__, __LINE__,
"Unknown datatype.");
822 strm << DapIndent::LMarg <<
"D4StreamMarshaller::dump - (" << (
void *)
this <<
")" << endl;
static void * write_thread(void *arg)
virtual void put_count(int64_t count)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
virtual void dump(std::ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_vector_float32(char *val, int64_t num_elem)
Write a fixed size vector.
checksum GetCrc32() const
Type
Identifies the data type.
A class for software fault reporting.
void AddData(const uint8_t *pData, const uint32_t length)
virtual void put_vector_part(char *, unsigned int, int, Type)
virtual void checksum_update(const void *data, unsigned long len)
virtual void put_vector(char *val, int64_t num_bytes)
Write a fixed size vector.
virtual string get_checksum()
virtual void put_vector_float64(char *val, int64_t num_elem)
Write a fixed size vector of float64s.
virtual void put_checksum()
Write the checksum Write the checksum for the data sent since the last call to reset_checksum() to th...
bool is_host_big_endian()
Does this host use big-endian byte order?
virtual void reset_checksum()